diff --git a/documentation/src/main/jekyll/_data/categories.yml b/documentation/src/main/jekyll/_data/categories.yml index e0fb5512e..400d6797d 100644 --- a/documentation/src/main/jekyll/_data/categories.yml +++ b/documentation/src/main/jekyll/_data/categories.yml @@ -29,6 +29,7 @@ - emit-subscription - logging - context-passing + - replaying-multis - name: Integration guides: diff --git a/documentation/src/main/jekyll/_data/guides.yml b/documentation/src/main/jekyll/_data/guides.yml index fbb541007..86a514151 100644 --- a/documentation/src/main/jekyll/_data/guides.yml +++ b/documentation/src/main/jekyll/_data/guides.yml @@ -319,3 +319,9 @@ context-passing: labels: - intermediate - advanced + +replaying-multis: + title: Replaying Multis + text: Learn how multiple subscribers can replay from a Multi + labels: + - advanced \ No newline at end of file diff --git a/documentation/src/main/jekyll/guides/replaying-multis.adoc b/documentation/src/main/jekyll/guides/replaying-multis.adoc new file mode 100644 index 000000000..75078da5f --- /dev/null +++ b/documentation/src/main/jekyll/guides/replaying-multis.adoc @@ -0,0 +1,91 @@ +:page-layout: guides +:page-guide-id: replaying-multis +:page-liquid: +:include_dir: ../../../../src/test/java/guides/operators +:imagesdir: ../assets/images + +A `Multi` is a _cold-source_: no processing happens until you subscribe. + +While the `broadcast` operator can be used so that multiple subscribers consume a `Multi` events _at the same time_, it does not support replaying items for _late subscribers_: when a subscriber joins after the `Multi` has completed (or failed), then it won't receive any item. + +This is where _replaying_ can be useful. + +== Replaying all events + +Replaying all events from an upstream `Multi` works as follows: + +[source,java,indent=0] +---- +include::{include_dir}/ReplayTest.java[tag=replay-all] +---- + +Both `item_1` and `item_2` trigger new subscriptions, and both lists contain the following elements: + +---- +[0, 1, 2, 3, 4, 5, 6, 7, 8, 9] +---- + +Replaying works by turning `upstream` into a _hot-stream_, meaning that it gets requested `Long.MAX_VALUE` elements. +This is done when the first subscription happens. + +The replay operator stores the items in an internal _replay log_, and then each subscriber gets to replay them. + +[IMPORTANT] +==== +Subscribers demand and cancellation requests are honored while replaying, but `upstream` cannot be cancelled. +Be careful with unbounded streams as you can exhaust memory! +In such cases or when you need to replay large amounts of data, you might opt to use some eventing middleware rather than Mutiny replays. +==== + +== Replaying the last 'n' events + +You can limit the number of elements to replay by using the `upTo` method: + +[source,java,indent=0] +---- +include::{include_dir}/ReplayTest.java[tag=replay-last] +---- + +Each new subscriber gets to replay from the last `n` elements from where the replay log is at subscription time. +For instance the first subscriber can observe all events, while a subscriber that joins 2 seconds later might not observe the earlier events. + +Since `Multi.createFrom().range(0, 10)` is an _immediate_ stream, both `item_1` and `item_2` lists contain the last items: + +---- +[7, 8, 9] +---- + +== Prepending with seed data + +In some cases you might want to prepend some _seed_ data that will be available for replay before the upstream starts emitting. + +You can do so using an `Iterable` to provide such seed data: + +[source,java,indent=0] +---- +include::{include_dir}/ReplayTest.java[tag=replay-seed] +---- + +In which case subscribers can observe the following events: + +---- +[-10, -5, -1, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9] +---- + +== Replay of failures and completions + +Subscribers get to observe not just items but also the failure and completion events: + +[source,java,indent=0] +---- +include::{include_dir}/ReplayTest.java[tag=replay-errors] +---- + +Running this code yields the following output for any subscriber: + +---- + -> 7 + -> 8 + -> 9 +Failed: boom +---- diff --git a/documentation/src/test/java/guides/operators/ReplayTest.java b/documentation/src/test/java/guides/operators/ReplayTest.java new file mode 100644 index 000000000..268421678 --- /dev/null +++ b/documentation/src/test/java/guides/operators/ReplayTest.java @@ -0,0 +1,82 @@ +package guides.operators; + +import io.smallrye.mutiny.Multi; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +public class ReplayTest { + + @Test + public void replayAll() { + // tag::replay-all[] + Multi upstream = Multi.createFrom().range(0, 10); + + Multi replay = Multi.createBy().replaying().ofMulti(upstream); + + List items_1 = replay.collect().asList().await().indefinitely(); + List items_2 = replay.collect().asList().await().indefinitely(); + // end::replay-all[] + + assertThat(items_1) + .isEqualTo(items_2) + .hasSize(10); + System.out.println(items_1); + } + + @Test + public void replayLast() { + // tag::replay-last[] + Multi upstream = Multi.createFrom().range(0, 10); + + Multi replay = Multi.createBy().replaying().upTo(3).ofMulti(upstream); + + List items_1 = replay.collect().asList().await().indefinitely(); + List items_2 = replay.collect().asList().await().indefinitely(); + // end::replay-last[] + + assertThat(items_1) + .isEqualTo(items_2) + .hasSize(3); + System.out.println(items_1); + } + + @Test + public void replayWithSeed() { + // tag::replay-seed[] + Multi upstream = Multi.createFrom().range(0, 10); + Iterable seed = Arrays.asList(-10, -5, -1); + + Multi replay = Multi.createBy().replaying().ofSeedAndMulti(seed, upstream); + + List items_1 = replay.collect().asList().await().indefinitely(); + List items_2 = replay.collect().asList().await().indefinitely(); + // end::replay-seed[] + + assertThat(items_1) + .isEqualTo(items_2) + .hasSize(13); + System.out.println(items_1); + } + + @Test + public void errors() { + // tag::replay-errors[] + Multi upstream = Multi.createBy().concatenating().streams( + Multi.createFrom().range(0, 10), + Multi.createFrom().failure(() -> new IOException("boom")) + ); + + Multi replay = Multi.createBy().replaying().upTo(3).ofMulti(upstream); + + replay.subscribe().with( + n -> System.out.println(" -> " + n), + failure -> System.out.println("Failed: " + failure.getMessage()), + () -> System.out.println("Completed")); + // end::replay-errors[] + } +} diff --git a/implementation/src/main/java/io/smallrye/mutiny/groups/MultiCreateBy.java b/implementation/src/main/java/io/smallrye/mutiny/groups/MultiCreateBy.java index ee736baa5..6d4b115d9 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/groups/MultiCreateBy.java +++ b/implementation/src/main/java/io/smallrye/mutiny/groups/MultiCreateBy.java @@ -1,6 +1,7 @@ package io.smallrye.mutiny.groups; import io.smallrye.common.annotation.CheckReturnValue; +import io.smallrye.common.annotation.Experimental; import io.smallrye.mutiny.Multi; /** @@ -72,4 +73,15 @@ public MultiRepetition repeating() { return new MultiRepetition(); } + /** + * Creates a new {@link Multi} that replays elements from another {@link Multi} to any number of current and late + * subscribers. + * + * @return the object to configure the replay behavior + */ + @CheckReturnValue + @Experimental("Replaying of Multi is an experimental feature in Mutiny 1.4.0") + public MultiReplay replaying() { + return new MultiReplay(); + } } diff --git a/implementation/src/main/java/io/smallrye/mutiny/groups/MultiReplay.java b/implementation/src/main/java/io/smallrye/mutiny/groups/MultiReplay.java new file mode 100644 index 000000000..79c5dd928 --- /dev/null +++ b/implementation/src/main/java/io/smallrye/mutiny/groups/MultiReplay.java @@ -0,0 +1,79 @@ +package io.smallrye.mutiny.groups; + +import static io.smallrye.mutiny.helpers.ParameterValidation.nonNull; +import static io.smallrye.mutiny.helpers.ParameterValidation.positive; + +import io.smallrye.common.annotation.Experimental; +import io.smallrye.mutiny.Multi; +import io.smallrye.mutiny.operators.multi.replay.ReplayOperator; + +/** + * Group to configure replaying a {@link Multi} to multiple subscribers. + */ +@Experimental("Replaying of Multi is an experimental feature in Mutiny 1.4.0") +public class MultiReplay { + + private long numberOfItemsToReplay = Long.MAX_VALUE; + + /** + * Limit the number of items each new subscriber gets. + * The default is to replay all events. + * + * @param numberOfItemsToReplay a strictly positive number of items to be replayed, where {@code Long.MAX_VALUE} means + * replaying all events + * @return this group + */ + public MultiReplay upTo(long numberOfItemsToReplay) { + this.numberOfItemsToReplay = positive(numberOfItemsToReplay, "numberOfItemsToReplay"); + return this; + } + + /** + * Create a replay {@link Multi}. + *

+ * Replaying work as follows. + *

    + *
  1. The provided {@code upstream} {@link Multi} is turned into a hot-stream as it gets requested {@code Long.MAX_VALUE} + * elements. + * This happens at the first subscription request. Note that {@code upstream} will never be cancelled.
  2. + *
  3. Each new subscriber to this replay {@link Multi} is able to replay items at its own pace (back-pressure is + * honored).
  4. + *
  5. When the number of items to replay is limited using {@link #upTo(long)}, then a new subscriber gets to replay + * starting from the current position in the upstream replay log. + * When the number of elements to replay is unbounded, then a new subscriber replays from the start.
  6. + *
  7. All current and late subscribers observe terminal completion / error signals.
  8. + *
  9. Items are pushed synchronously to subscribers when they call {@link org.reactivestreams.Subscription#request(long)} + * and there are enough elements to satisfy a part of the demand. + * Otherwise items are pushed from the upstream to all subscribers with an outstanding demand.
  10. + *
+ *

+ * Replaying a large number of elements can be costly, as items have to be kept in-memory. + * It is not recommended using this operator with unbounded streams, especially as they can't be cancelled (the subscribers + * can cancel replays, though). + * In such cases and especially when you have to keep replay data around for a long time then some eventing middleware might + * be a better fit. + * + * @param upstream the {@link Multi} to replay, must not be {@code null} + * @param the items type + * @return a replaying {@link Multi} + */ + public Multi ofMulti(Multi upstream) { + return new ReplayOperator<>(nonNull(upstream, "upstream"), numberOfItemsToReplay); + } + + /** + * Create a replay {@link Multi} with some seed elements inserted before the provided {@link Multi} items. + *

+ * The behavior is that of {@link #ofMulti(Multi)}, except that the items from {@code seed} are prepended to those from + * {@code upstream} in the replay log. + * + * @param seed the seed elements, must not be {@code null}, must not contain any {@code null} element + * @param upstream the {@link Multi} to replay, must not be {@code null} + * @param the items type + * @return a replaying {@link Multi} + * @see #ofMulti(Multi) + */ + public Multi ofSeedAndMulti(Iterable seed, Multi upstream) { + return new ReplayOperator<>(nonNull(upstream, "upstream"), numberOfItemsToReplay, nonNull(seed, "seed")); + } +} diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/replay/AppendOnlyReplayList.java b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/replay/AppendOnlyReplayList.java new file mode 100644 index 000000000..b3d9aaf5d --- /dev/null +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/replay/AppendOnlyReplayList.java @@ -0,0 +1,151 @@ +package io.smallrye.mutiny.operators.multi.replay; + +import static io.smallrye.mutiny.helpers.ParameterValidation.nonNull; + +/* + * Replay is being captured using a custom linked list, while consumers can make progress using cursors. + * + * The "start" depends on the replay semantics: + * - zero for unbounded replays, + * - the last n elements before the tail for bounded replays. + * + * From there each cursor (1 per subscriber) can make progress at its own pace. + * + * The code assumes reactive streams semantics, especially that there are no concurrent appends because of + * serial events. + * + * Bounded replays shall have earlier cells before the head be eventually garbage collected as there are only forward + * references. + */ +public class AppendOnlyReplayList { + + public class Cursor { + + private Cell current = SENTINEL_EMPTY; + private boolean start = true; + private boolean currentHasBeenRead = false; + + public boolean hasNext() { + if (current == SENTINEL_EMPTY) { + Cell currentHead = head; + if (currentHead != SENTINEL_EMPTY) { + current = currentHead; + return true; + } else { + return false; + } + } else if (!currentHasBeenRead) { + return true; + } else { + return current.next != SENTINEL_END; + } + } + + public void moveToNext() { + if (start) { + start = false; + return; + } + assert current.next != SENTINEL_END; + current = current.next; + currentHasBeenRead = false; + } + + public Object read() { + currentHasBeenRead = true; + return current.value; + } + + public boolean hasReachedCompletion() { + return current.value instanceof Completion; + } + + public boolean hasReachedFailure() { + return current.value instanceof Failure; + } + + public Throwable readFailure() { + currentHasBeenRead = true; + return ((Failure) current.value).failure; + } + + public void readCompletion() { + currentHasBeenRead = true; + } + } + + private static abstract class Terminal { + + } + + private static final class Completion extends Terminal { + + } + + private static final class Failure extends Terminal { + final Throwable failure; + + Failure(Throwable failure) { + this.failure = failure; + } + } + + private static class Cell { + final Object value; + volatile Cell next; + + Cell(Object value, Cell next) { + this.value = value; + this.next = next; + } + } + + private static final Cell SENTINEL_END = new Cell(null, null); + private static final Cell SENTINEL_EMPTY = new Cell(null, SENTINEL_END); + + private final long itemsToReplay; + private long numberOfItemsRecorded = 0L; + private volatile Cell head = SENTINEL_EMPTY; + private volatile Cell tail = SENTINEL_EMPTY; + + public AppendOnlyReplayList(long numberOfItemsToReplay) { + this(numberOfItemsToReplay, null); + } + + public AppendOnlyReplayList(long numberOfItemsToReplay, Iterable seed) { + assert numberOfItemsToReplay > 0; + this.itemsToReplay = numberOfItemsToReplay; + if (seed != null) { + seed.forEach(this::push); + } + } + + public void push(Object item) { + assert !(tail.value instanceof Terminal); + Cell newCell = new Cell(nonNull(item, "item"), SENTINEL_END); + if (head == SENTINEL_EMPTY) { + head = newCell; + } else { + tail.next = newCell; + } + tail = newCell; + if (itemsToReplay != Long.MAX_VALUE && !(item instanceof Terminal)) { + numberOfItemsRecorded++; + if (numberOfItemsRecorded > itemsToReplay) { + head = head.next; + } + } + } + + public void pushFailure(Throwable failure) { + push(new Failure(failure)); + } + + public void pushCompletion() { + push(new Completion()); + } + + public Cursor newCursor() { + return new Cursor(); + } +} diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/replay/ReplayOperator.java b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/replay/ReplayOperator.java new file mode 100644 index 000000000..06f0cf32f --- /dev/null +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/replay/ReplayOperator.java @@ -0,0 +1,178 @@ +package io.smallrye.mutiny.operators.multi.replay; + +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import org.reactivestreams.Subscription; + +import io.smallrye.mutiny.Context; +import io.smallrye.mutiny.Multi; +import io.smallrye.mutiny.helpers.Subscriptions; +import io.smallrye.mutiny.operators.AbstractMulti; +import io.smallrye.mutiny.subscription.ContextSupport; +import io.smallrye.mutiny.subscription.MultiSubscriber; + +public class ReplayOperator extends AbstractMulti { + + private final Multi upstream; + + private final AppendOnlyReplayList replayList; + + private final AtomicBoolean upstreamSubscriptionRequested = new AtomicBoolean(); + private volatile Subscription upstreamSubscription = null; + private final CopyOnWriteArrayList subscriptions = new CopyOnWriteArrayList<>(); + + public ReplayOperator(Multi upstream, long numberOfItemsToReplay) { + this.upstream = upstream; + this.replayList = new AppendOnlyReplayList(numberOfItemsToReplay); + } + + public ReplayOperator(Multi upstream, long numberOfItemsToReplay, Iterable seed) { + this.upstream = upstream; + this.replayList = new AppendOnlyReplayList(numberOfItemsToReplay, seed); + } + + @Override + public void subscribe(MultiSubscriber subscriber) { + if (upstreamSubscriptionRequested.compareAndSet(false, true)) { + upstream.subscribe(new UpstreamSubscriber(subscriber)); + } + ReplaySubscription replaySubscription = new ReplaySubscription(subscriber); + subscriber.onSubscribe(replaySubscription); + subscriptions.add(replaySubscription); + } + + private class ReplaySubscription implements Subscription { + + private final MultiSubscriber downstream; + private final AtomicLong demand = new AtomicLong(); + private volatile boolean done = false; + private final AppendOnlyReplayList.Cursor cursor; + + private ReplaySubscription(MultiSubscriber downstream) { + this.downstream = downstream; + this.cursor = replayList.newCursor(); + this.cursor.hasNext(); // Try to catch the replay stream at subscription time if ready + } + + @Override + public void request(long n) { + if (done) { + return; + } + if (n <= 0) { + cancel(); + downstream.onFailure(Subscriptions.getInvalidRequestException()); + return; + } + Subscriptions.add(demand, n); + if (cursor.hasNext()) { + drain(); + } + } + + @Override + public void cancel() { + done = true; + subscriptions.remove(this); + } + + private final AtomicInteger wip = new AtomicInteger(); + + @SuppressWarnings("unchecked") + private void drain() { + if (done) { + return; + } + if (wip.getAndIncrement() > 0) { + return; + } + while (true) { + if (done) { + return; + } + long max = demand.get(); + long emitted = 0; + while (emitted < max && cursor.hasNext()) { + if (done) { + return; + } + cursor.moveToNext(); + if (cursor.hasReachedCompletion()) { + cancel(); + cursor.readCompletion(); + downstream.onComplete(); + return; + } + if (cursor.hasReachedFailure()) { + cancel(); + downstream.onFailure(cursor.readFailure()); + return; + } + T item = (T) cursor.read(); + assert item != null; // Invariant enforced by AppendOnlyReplayList + downstream.onItem(item); + emitted++; + } + demand.addAndGet(-emitted); + if (wip.decrementAndGet() == 0) { + return; + } + } + } + } + + private class UpstreamSubscriber implements MultiSubscriber, ContextSupport { + + private final MultiSubscriber initialSubscriber; + + public UpstreamSubscriber(MultiSubscriber initialSubscriber) { + this.initialSubscriber = initialSubscriber; + } + + @Override + public void onItem(T item) { + replayList.push(item); + triggerDrainLoops(); + } + + @Override + public void onFailure(Throwable failure) { + replayList.pushFailure(failure); + markAsDone(); + triggerDrainLoops(); + } + + @Override + public void onCompletion() { + replayList.pushCompletion(); + markAsDone(); + triggerDrainLoops(); + } + + @Override + public void onSubscribe(Subscription subscription) { + upstreamSubscription = subscription; + upstreamSubscription.request(Long.MAX_VALUE); + } + + @Override + public Context context() { + if (initialSubscriber instanceof ContextSupport) { + return ((ContextSupport) initialSubscriber).context(); + } else { + return Context.empty(); + } + } + + private void triggerDrainLoops() { + subscriptions.forEach(ReplaySubscription::drain); + } + + private void markAsDone() { + upstreamSubscription = Subscriptions.CANCELLED; + } + } +} diff --git a/implementation/src/test/java/io/smallrye/mutiny/groups/MultiReplayTest.java b/implementation/src/test/java/io/smallrye/mutiny/groups/MultiReplayTest.java new file mode 100644 index 000000000..83e097edd --- /dev/null +++ b/implementation/src/test/java/io/smallrye/mutiny/groups/MultiReplayTest.java @@ -0,0 +1,305 @@ +package io.smallrye.mutiny.groups; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.awaitility.Awaitility.await; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Random; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +import org.junit.jupiter.api.Test; +import org.reactivestreams.Subscription; + +import io.smallrye.mutiny.Context; +import io.smallrye.mutiny.Multi; +import io.smallrye.mutiny.helpers.test.AssertSubscriber; +import io.smallrye.mutiny.subscription.MultiSubscriber; + +class MultiReplayTest { + + private Random random = new Random(); + + @Test + void rejectBadBuilderArguments() { + assertThatThrownBy(() -> Multi.createBy().replaying().ofMulti(null)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("must not be `null`"); + + assertThatThrownBy(() -> Multi.createBy().replaying().ofSeedAndMulti(null, Multi.createFrom().item(123))) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("must not be `null`"); + + assertThatThrownBy(() -> Multi.createBy().replaying().ofSeedAndMulti(new ArrayList(), null)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("must not be `null`"); + + assertThatThrownBy(() -> Multi.createBy().replaying().upTo(0)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("must be greater than zero"); + + assertThatThrownBy(() -> Multi.createBy().replaying().upTo(-10)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("must be greater than zero"); + } + + @Test + void basicReplayAll() { + Multi upstream = Multi.createFrom().range(1, 10); + Multi replay = Multi.createBy().replaying().ofMulti(upstream); + + AssertSubscriber sub = replay.subscribe().withSubscriber(AssertSubscriber.create()); + sub.request(Long.MAX_VALUE); + assertThat(sub.getItems()).containsExactly(1, 2, 3, 4, 5, 6, 7, 8, 9); + sub.assertCompleted(); + + sub = replay.subscribe().withSubscriber(AssertSubscriber.create()); + sub.request(4); + assertThat(sub.getItems()).containsExactly(1, 2, 3, 4); + sub.request(Long.MAX_VALUE); + assertThat(sub.getItems()).containsExactly(1, 2, 3, 4, 5, 6, 7, 8, 9); + sub.assertCompleted(); + } + + @Test + void basicReplayLatest3() { + ExecutorService pool = Executors.newFixedThreadPool(1); + AtomicBoolean step = new AtomicBoolean(); + + Multi upstream = Multi.createFrom(). emitter(emitter -> { + await().untilTrue(step); + for (int i = 0; i <= 10; i++) { + emitter.emit(i); + } + emitter.complete(); + }).runSubscriptionOn(pool); + Multi replay = Multi.createBy().replaying().upTo(3).ofMulti(upstream); + + try { + AssertSubscriber sub = replay.subscribe().withSubscriber(AssertSubscriber.create(Long.MAX_VALUE)); + step.set(true); + sub.awaitCompletion(); + assertThat(sub.getItems()).containsExactly(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10); + + sub = replay.subscribe().withSubscriber(AssertSubscriber.create()); + sub.request(1); + assertThat(sub.getItems()).containsExactly(8); + sub.request(1); + assertThat(sub.getItems()).containsExactly(8, 9); + sub.request(3000); + assertThat(sub.getItems()).containsExactly(8, 9, 10); + sub.assertCompleted(); + } finally { + pool.shutdownNow(); + } + } + + @Test + void replayLast3AfterFailure() { + Multi upstream = Multi.createBy().concatenating().streams( + Multi.createFrom().range(1, 10), + Multi.createFrom().failure(() -> new IOException("boom"))); + Multi replay = Multi.createBy().replaying().upTo(3).ofMulti(upstream); + + AssertSubscriber sub = replay.subscribe().withSubscriber(AssertSubscriber.create(Long.MAX_VALUE)); + sub.assertFailedWith(IOException.class, "boom"); + sub.assertItems(7, 8, 9); + + sub = replay.subscribe().withSubscriber(AssertSubscriber.create(Long.MAX_VALUE)); + sub.assertFailedWith(IOException.class, "boom"); + sub.assertItems(7, 8, 9); + } + + @Test + void replayWithSeed() { + List seed = Arrays.asList(-100, -10, -1); + Multi upstream = Multi.createFrom().range(0, 11); + Multi replay = Multi.createBy().replaying().ofSeedAndMulti(seed, upstream); + + AssertSubscriber sub = replay.subscribe().withSubscriber(AssertSubscriber.create(Long.MAX_VALUE)); + sub.assertCompleted(); + assertThat(sub.getItems()).containsExactly(-100, -10, -1, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10); + } + + @Test + void forbidSeedWithNull() { + List seed = Arrays.asList(-100, -10, -1, null); + Multi upstream = Multi.createFrom().range(0, 11); + assertThatThrownBy(() -> Multi.createBy().replaying().ofSeedAndMulti(seed, upstream)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("`item` must not be `null`"); + } + + @Test + void rejectBadRequests() { + List seed = Arrays.asList(-100, -10, -1); + Multi upstream = Multi.createFrom().range(0, 11); + Multi replay = Multi.createBy().replaying().ofSeedAndMulti(seed, upstream); + + DirectSubscriber sub = replay.subscribe().withSubscriber(new DirectSubscriber<>()); + sub.request(-1); + assertThat(sub.failure).isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid request number, must be greater than 0"); + + sub = replay.subscribe().withSubscriber(new DirectSubscriber<>()); + sub.request(0); + assertThat(sub.failure).isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid request number, must be greater than 0"); + } + + @Test + void acceptContext() { + Multi upstream = Multi.createFrom().range(1, 10) + .withContext((multi, ctx) -> multi.onItem().transform(n -> n + "::" + ctx.get("foo"))); + Multi replay = Multi.createBy().replaying().upTo(3).ofMulti(upstream); + + AssertSubscriber sub = replay.subscribe() + .withSubscriber(AssertSubscriber.create(Context.of("foo", "foo-bar"), Long.MAX_VALUE)); + sub.assertCompleted(); + assertThat(sub.getItems()).containsExactly("7::foo-bar", "8::foo-bar", "9::foo-bar"); + + sub = replay.subscribe().withSubscriber(AssertSubscriber.create(Context.of("foo", "foo-bar-baz"), Long.MAX_VALUE)); + sub.assertCompleted(); + assertThat(sub.getItems()).containsExactly("7::foo-bar", "8::foo-bar", "9::foo-bar"); + } + + @Test + void acceptCancellation() { + Multi replay = Multi.createBy().replaying().ofMulti(Multi.createFrom().range(1, 10)); + + DirectSubscriber sub = replay.subscribe().withSubscriber(new DirectSubscriber<>()); + sub.request(Long.MAX_VALUE); + assertThat(sub.completed).isTrue(); + assertThat(sub.items).containsExactly(1, 2, 3, 4, 5, 6, 7, 8, 9); + + sub = replay.subscribe().withSubscriber(new DirectSubscriber<>()); + sub.request(3); + assertThat(sub.completed).isFalse(); + assertThat(sub.items).containsExactly(1, 2, 3); + sub.cancel(); + sub.request(Long.MAX_VALUE); + assertThat(sub.completed).isFalse(); + assertThat(sub.items).containsExactly(1, 2, 3); + + sub = replay.subscribe().withSubscriber(new DirectSubscriber<>()); + sub.request(5); + assertThat(sub.completed).isFalse(); + assertThat(sub.items).containsExactly(1, 2, 3, 4, 5); + sub.request(1); + sub.cancel(); + sub.request(Long.MAX_VALUE); + assertThat(sub.completed).isFalse(); + assertThat(sub.items).containsExactly(1, 2, 3, 4, 5, 6); + } + + // Avoid wrapping that does not allow us to test cancellation and requests directly on the operator + static class DirectSubscriber implements MultiSubscriber { + + private final ArrayList items = new ArrayList<>(); + private Throwable failure; + private boolean completed; + private Subscription subscription; + + @Override + public void onItem(T item) { + items.add(item); + } + + @Override + public void onFailure(Throwable failure) { + this.failure = failure; + } + + @Override + public void onCompletion() { + completed = true; + } + + @Override + public void onSubscribe(Subscription s) { + subscription = s; + } + + public void request(long demand) { + subscription.request(demand); + } + + public void cancel() { + subscription.cancel(); + } + } + + @Test + void raceBetweenPushAndCancel() throws InterruptedException, TimeoutException { + ExecutorService pool = Executors.newCachedThreadPool(); + try { + + final int N = 32; + CountDownLatch startLatch = new CountDownLatch(N); + CountDownLatch endLatch = new CountDownLatch(N); + + Multi upstream = Multi.createFrom(). emitter(emitter -> { + try { + startLatch.await(); + } catch (InterruptedException e) { + emitter.fail(e); + } + long i = 0; + while (endLatch.getCount() != 0) { + emitter.emit(i++); + } + emitter.complete(); + }).runSubscriptionOn(pool); + + Multi replay = Multi.createBy().replaying().ofMulti(upstream) + .runSubscriptionOn(pool); + + CopyOnWriteArrayList> items = new CopyOnWriteArrayList<>(); + for (int i = 0; i < N; i++) { + AssertSubscriber sub = replay.subscribe().withSubscriber(AssertSubscriber.create()); + pool.submit(() -> { + startLatch.countDown(); + randomSleep(); + sub.request(Long.MAX_VALUE); + randomSleep(); + sub.cancel(); + items.add(sub.getItems()); + endLatch.countDown(); + }); + } + + if (!endLatch.await(10, TimeUnit.SECONDS)) { + throw new TimeoutException("The test did not finish within 10 seconds"); + } + + assertThat(items).hasSize(N); + items.forEach(list -> { + if (list.isEmpty()) { + // Might happen due to subscriber timing + return; + } + assertThat(list).isNotEmpty(); + AtomicLong prev = new AtomicLong(list.get(0)); + list.stream().skip(1).forEach(n -> { + assertThat(n).isEqualTo(prev.get() + 1); + prev.set(n); + }); + }); + } finally { + pool.shutdownNow(); + } + } + + private void randomSleep() { + try { + Thread.sleep(250 + random.nextInt(250)); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } +} diff --git a/implementation/src/test/java/io/smallrye/mutiny/operators/multi/replay/AppendOnlyReplayListTest.java b/implementation/src/test/java/io/smallrye/mutiny/operators/multi/replay/AppendOnlyReplayListTest.java new file mode 100644 index 000000000..77d91fbb7 --- /dev/null +++ b/implementation/src/test/java/io/smallrye/mutiny/operators/multi/replay/AppendOnlyReplayListTest.java @@ -0,0 +1,287 @@ +package io.smallrye.mutiny.operators.multi.replay; + +import static org.assertj.core.api.Assertions.*; +import static org.awaitility.Awaitility.await; +import static org.junit.jupiter.api.Assumptions.assumeFalse; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Random; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import org.junit.jupiter.api.Test; + +class AppendOnlyReplayListTest { + + private Random random = new Random(); + + @Test + void checkReadyAtStart() { + AppendOnlyReplayList replayList = new AppendOnlyReplayList(Long.MAX_VALUE); + AppendOnlyReplayList.Cursor cursor = replayList.newCursor(); + + assertThat(cursor.hasNext()).isFalse(); + replayList.push("foo"); + replayList.push("bar"); + assertThat(cursor.hasNext()).isTrue(); + cursor.moveToNext(); + assertThat(cursor.read()).isEqualTo("foo"); + assertThat(cursor.hasNext()).isTrue(); + cursor.moveToNext(); + assertThat(cursor.hasNext()).isTrue(); + assertThat(cursor.read()).isEqualTo("bar"); + assertThat(cursor.hasNext()).isFalse(); + } + + @Test + void pushSomeItemsAndComplete() { + AppendOnlyReplayList replayList = new AppendOnlyReplayList(Long.MAX_VALUE); + ArrayList reference = new ArrayList<>(); + + AppendOnlyReplayList.Cursor firstCursor = replayList.newCursor(); + assertThat(firstCursor.hasNext()).isFalse(); + for (int i = 0; i < 20; i++) { + replayList.push(i); + reference.add(i); + } + replayList.pushCompletion(); + AppendOnlyReplayList.Cursor secondCursor = replayList.newCursor(); + + checkCompletedWithAllItems(reference, firstCursor); + checkCompletedWithAllItems(reference, secondCursor); + } + + private void checkCompletedWithAllItems(ArrayList reference, AppendOnlyReplayList.Cursor cursor) { + ArrayList proof = new ArrayList<>(); + assertThat(cursor.hasNext()).isTrue(); + while (cursor.hasNext()) { + cursor.moveToNext(); + if (cursor.hasReachedCompletion()) { + cursor.readCompletion(); + assumeFalse(cursor.hasNext()); + break; + } + assumeFalse(cursor.hasReachedFailure()); + proof.add((Integer) cursor.read()); + } + assertThat(proof).isEqualTo(reference); + } + + @Test + void pushSomeItemsAndFail() { + AppendOnlyReplayList replayList = new AppendOnlyReplayList(Long.MAX_VALUE); + ArrayList reference = new ArrayList<>(); + + AppendOnlyReplayList.Cursor firstCursor = replayList.newCursor(); + for (int i = 0; i < 20; i++) { + replayList.push(i); + reference.add(i); + } + replayList.pushFailure(new IOException("woops")); + AppendOnlyReplayList.Cursor secondCursor = replayList.newCursor(); + + checkFailedWithAllItems(reference, firstCursor, IOException.class, "woops"); + checkFailedWithAllItems(reference, secondCursor, IOException.class, "woops"); + } + + private void checkFailedWithAllItems(ArrayList reference, AppendOnlyReplayList.Cursor cursor, Class failureType, + String failureMessage) { + ArrayList proof = new ArrayList<>(); + assertThat(cursor.hasNext()).isTrue(); + while (cursor.hasNext()) { + cursor.moveToNext(); + if (cursor.hasReachedFailure()) { + assertThat(cursor.readFailure()).isInstanceOf(failureType).hasMessage(failureMessage); + assumeFalse(cursor.hasNext()); + break; + } + assumeFalse(cursor.hasReachedCompletion()); + proof.add((Integer) cursor.read()); + } + assertThat(proof).isEqualTo(reference); + assertThat(cursor.hasNext()).isFalse(); + assertThat(cursor.hasReachedFailure()).isTrue(); + + } + + @Test + void boundedReplay() { + AppendOnlyReplayList replayList = new AppendOnlyReplayList(3); + replayList.push(1); + replayList.push(2); + + AppendOnlyReplayList.Cursor firstCursor = replayList.newCursor(); + assertThat(firstCursor.hasNext()).isTrue(); + firstCursor.moveToNext(); + assertThat(firstCursor.read()).isEqualTo(1); + assertThat(firstCursor.hasNext()).isTrue(); + firstCursor.moveToNext(); + assertThat(firstCursor.read()).isEqualTo(2); + assertThat(firstCursor.hasNext()).isFalse(); + + AppendOnlyReplayList.Cursor secondCursor = replayList.newCursor(); + replayList.push(3); + replayList.push(4); + replayList.push(5); + + assertThat(secondCursor.hasNext()).isTrue(); + secondCursor.moveToNext(); + assertThat(secondCursor.read()).isEqualTo(3); + secondCursor.moveToNext(); + assertThat(secondCursor.read()).isEqualTo(4); + secondCursor.moveToNext(); + assertThat(secondCursor.read()).isEqualTo(5); + assertThat(secondCursor.hasNext()).isFalse(); + + assertThat(firstCursor.hasNext()).isTrue(); + firstCursor.moveToNext(); + assertThat(firstCursor.read()).isEqualTo(3); + firstCursor.moveToNext(); + assertThat(firstCursor.read()).isEqualTo(4); + + replayList.push(6); + replayList.pushFailure(new IOException("boom")); + + AppendOnlyReplayList.Cursor lateCursor = replayList.newCursor(); + assertThat(lateCursor.hasNext()).isTrue(); + lateCursor.moveToNext(); + assertThat(lateCursor.read()).isEqualTo(4); + assertThat(lateCursor.hasNext()).isTrue(); + lateCursor.moveToNext(); + assertThat(lateCursor.read()).isEqualTo(5); + assertThat(lateCursor.hasNext()).isTrue(); + lateCursor.moveToNext(); + assertThat(lateCursor.read()).isEqualTo(6); + assertThat(lateCursor.hasNext()).isTrue(); + lateCursor.moveToNext(); + assertThat(lateCursor.hasReachedFailure()).isTrue(); + assertThat(lateCursor.hasNext()).isTrue(); + assertThat(lateCursor.readFailure()).isInstanceOf(IOException.class).hasMessage("boom"); + assertThat(lateCursor.hasNext()).isFalse(); + } + + @Test + void concurrencySanityChecks() { + final int N_CONSUMERS = 4; + AppendOnlyReplayList replayList = new AppendOnlyReplayList(256); + AtomicBoolean stop = new AtomicBoolean(); + AtomicLong counter = new AtomicLong(); + AtomicLong success = new AtomicLong(); + ConcurrentLinkedDeque problems = new ConcurrentLinkedDeque<>(); + ExecutorService pool = Executors.newCachedThreadPool(); + + pool.submit(() -> { + randomSleep(); + long start = System.currentTimeMillis(); + while (System.currentTimeMillis() - start < 5000L) { + for (int i = 0; i < 5000; i++) { + replayList.push(counter.getAndIncrement()); + } + } + stop.set(true); + }); + + for (int i = 0; i < N_CONSUMERS; i++) { + pool.submit(() -> { + AppendOnlyReplayList.Cursor cursor = replayList.newCursor(); + randomSleep(); + while (!cursor.hasNext()) { + // await + } + cursor.moveToNext(); + long previous = (long) cursor.read(); + while (!stop.get()) { + if (!cursor.hasNext()) { + continue; + } + cursor.moveToNext(); + long current = (long) cursor.read(); + if (current != previous + 1) { + problems.add("Broken sequence " + previous + " -> " + current); + return; + } + previous = current; + success.incrementAndGet(); + } + }); + } + + await().untilTrue(stop); + pool.shutdownNow(); + assertThat(problems).isEmpty(); + assertThat(success.get()).isGreaterThan(counter.get()); + } + + private void randomSleep() { + try { + Thread.sleep(250 + random.nextInt(250)); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + @Test + void seedUnbounded() { + List seed = IntStream.range(1, 9).boxed().collect(Collectors.toList()); + AppendOnlyReplayList replayList = new AppendOnlyReplayList(Long.MAX_VALUE, seed); + replayList.push(9); + replayList.push(10); + replayList.pushCompletion(); + + List expected = IntStream.range(1, 11).boxed().collect(Collectors.toList()); + ArrayList check = new ArrayList<>(); + AppendOnlyReplayList.Cursor cursor = replayList.newCursor(); + while (cursor.hasNext()) { + cursor.moveToNext(); + if (cursor.hasReachedCompletion()) { + break; + } + check.add((Integer) cursor.read()); + } + + assertThat(cursor.hasReachedCompletion()).isTrue(); + assertThat(check).isEqualTo(expected); + } + + @Test + void seedBounded() { + List seed = IntStream.range(1, 9).boxed().collect(Collectors.toList()); + AppendOnlyReplayList replayList = new AppendOnlyReplayList(4, seed); + replayList.push(9); + replayList.push(10); + replayList.pushCompletion(); + + List expected = Arrays.asList(7, 8, 9, 10); + ArrayList check = new ArrayList<>(); + AppendOnlyReplayList.Cursor cursor = replayList.newCursor(); + while (cursor.hasNext()) { + cursor.moveToNext(); + if (cursor.hasReachedCompletion()) { + break; + } + check.add((Integer) cursor.read()); + } + + assertThat(cursor.hasReachedCompletion()).isTrue(); + assertThat(check).isEqualTo(expected); + } + + @Test + void forbidNull() { + assertThatThrownBy(() -> { + List seed = Arrays.asList("foo", "bar", null); + AppendOnlyReplayList replayList = new AppendOnlyReplayList(Long.MAX_VALUE, seed); + AppendOnlyReplayList.Cursor cursor = replayList.newCursor(); + while (cursor.hasNext()) { + cursor.moveToNext(); + cursor.read(); + } + }).isInstanceOf(IllegalArgumentException.class).hasMessage("`item` must not be `null`"); + } +} diff --git a/reactive-streams-tck-tests/src/test/java/io/smallrye/mutiny/tcktests/MultiReplayTckTest.java b/reactive-streams-tck-tests/src/test/java/io/smallrye/mutiny/tcktests/MultiReplayTckTest.java new file mode 100644 index 000000000..f6a9dc695 --- /dev/null +++ b/reactive-streams-tck-tests/src/test/java/io/smallrye/mutiny/tcktests/MultiReplayTckTest.java @@ -0,0 +1,21 @@ +package io.smallrye.mutiny.tcktests; + +import org.reactivestreams.Publisher; +import org.testng.annotations.Ignore; + +import io.smallrye.mutiny.Multi; + +public class MultiReplayTckTest extends AbstractPublisherTck { + + @Override + public Publisher createPublisher(long elements) { + Multi upstream = upstream(elements); + return Multi.createBy().replaying().ofMulti(upstream); + } + + @Override + @Ignore + public void required_spec317_mustNotSignalOnErrorWhenPendingAboveLongMaxValue() { + // The broadcast is capping at Long.MAX. + } +}