From 328359ab12e5732f7c23f85e4385184f90c11b17 Mon Sep 17 00:00:00 2001 From: Clement Escoffier Date: Sat, 17 Apr 2021 14:07:26 +0200 Subject: [PATCH] The collect().last() operator was requesting Long.MAX but also propagating the requests from the downstream generating useless requests to the upstream. --- .../operators/multi/MultiLastItemOp.java | 5 +++ .../mutiny/operators/MultiCollectTest.java | 23 +++++++++++ .../operators/MultiSelectFirstOrLast.java | 40 ++++++++++++++----- 3 files changed, 57 insertions(+), 11 deletions(-) diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiLastItemOp.java b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiLastItemOp.java index c7c6eef8e..903e85684 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiLastItemOp.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiLastItemOp.java @@ -36,6 +36,11 @@ public void onSubscribe(Subscription subscription) { } } + @Override + public void request(long numberOfItems) { + // Ignored, we already requested Long.MAX + } + @Override public void onItem(T item) { last = item; diff --git a/implementation/src/test/java/io/smallrye/mutiny/operators/MultiCollectTest.java b/implementation/src/test/java/io/smallrye/mutiny/operators/MultiCollectTest.java index abe072327..b001b8d2c 100644 --- a/implementation/src/test/java/io/smallrye/mutiny/operators/MultiCollectTest.java +++ b/implementation/src/test/java/io/smallrye/mutiny/operators/MultiCollectTest.java @@ -5,6 +5,7 @@ import java.io.IOException; import java.time.Duration; import java.util.*; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiConsumer; import java.util.function.BinaryOperator; import java.util.function.Function; @@ -367,6 +368,28 @@ public void testCollectWhenWithFailure() { .hasMessageContaining("boom"); } + @Test + public void testThatLastEmitASingleRequest() { + AtomicInteger counter = new AtomicInteger(); + Multi.createFrom().items("a", "b", "c") + .onRequest().invoke(counter::incrementAndGet) + .collect().last() + .await().indefinitely(); + + assertThat(counter).hasValue(1); + } + + @Test + public void testThatFirstEmitASingleRequest() { + AtomicInteger counter = new AtomicInteger(); + Multi.createFrom().items("a", "b", "c") + .onRequest().invoke(counter::incrementAndGet) + .collect().first() + .await().indefinitely(); + + assertThat(counter).hasValue(1); + } + static class Person { private final String firstName; diff --git a/implementation/src/test/java/io/smallrye/mutiny/operators/MultiSelectFirstOrLast.java b/implementation/src/test/java/io/smallrye/mutiny/operators/MultiSelectFirstOrLast.java index 7ffa4f06b..4c7e7a3c3 100644 --- a/implementation/src/test/java/io/smallrye/mutiny/operators/MultiSelectFirstOrLast.java +++ b/implementation/src/test/java/io/smallrye/mutiny/operators/MultiSelectFirstOrLast.java @@ -10,9 +10,11 @@ import java.io.IOException; import java.time.Duration; import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Predicate; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; @@ -25,27 +27,40 @@ public class MultiSelectFirstOrLast { + private AtomicInteger counter; + + @BeforeEach + public void init() { + counter = new AtomicInteger(); + } + @Test public void testSelectFirstWithLimit() { - List list = Multi.createFrom().range(1, 5).select().first(2) - .collectItems().asList().await().indefinitely(); + List list = Multi.createFrom().range(1, 5) + .onRequest().invoke(() -> counter.incrementAndGet()) + .select().first(2) + .collect().asList().await().indefinitely(); assertThat(list).containsExactly(1, 2); + assertThat(counter).hasValue(1); } @Test public void testSelectFirst() { - List list = Multi.createFrom().range(1, 5).select().first(1) - .collectItems().asList().await().indefinitely(); + List list = Multi.createFrom().range(1, 5) + .onRequest().invoke(() -> counter.incrementAndGet()) + .select().first(1) + .collect().asList().await().indefinitely(); assertThat(list).containsExactly(1); + assertThat(counter).hasValue(1); } @SuppressWarnings("deprecation") @Test public void testSelectFirstDeprecated() { List list = Multi.createFrom().range(1, 5).transform().byTakingFirstItems(2) - .collectItems().asList().await().indefinitely(); + .collect().asList().await().indefinitely(); assertThat(list).containsExactly(1, 2); } @@ -53,23 +68,26 @@ public void testSelectFirstDeprecated() { @Test public void testSelectFirst0() { List list = Multi.createFrom().range(1, 5).select().first(0) - .collectItems().asList().await().indefinitely(); + .collect().asList().await().indefinitely(); assertThat(list).isEmpty(); } @Test public void testSelectLastWithLimit() { - List list = Multi.createFrom().range(1, 5).select().last(2) - .collectItems().asList().await().indefinitely(); + List list = Multi.createFrom().range(1, 5) + .onRequest().invoke(() -> counter.incrementAndGet()) + .select().last(2) + .collect().asList().await().indefinitely(); assertThat(list).containsExactly(3, 4); + assertThat(counter).hasValue(1); } @Test public void testSelectLast() { List list = Multi.createFrom().range(1, 5).select().last() - .collectItems().asList().await().indefinitely(); + .collect().asList().await().indefinitely(); assertThat(list).containsExactly(4); } @@ -78,7 +96,7 @@ public void testSelectLast() { @Test public void testSelectLastDeprecated() { List list = Multi.createFrom().range(1, 5).transform().byTakingLastItems(2) - .collectItems().asList().await().indefinitely(); + .collect().asList().await().indefinitely(); assertThat(list).containsExactly(3, 4); } @@ -87,7 +105,7 @@ public void testSelectLastDeprecated() { public void testSelectLastWith0() { List list = Multi.createFrom().range(1, 5) .select().last(0) - .collectItems().asList().await().indefinitely(); + .collect().asList().await().indefinitely(); assertThat(list).isEmpty(); }