Skip to content

Commit

Permalink
Merge pull request #525 from smallrye/multi-collect-last-invalid-requ…
Browse files Browse the repository at this point in the history
…ests

Multiple upstream requests in the multi.collect().last() operator
  • Loading branch information
jponge authored Apr 26, 2021
2 parents 68e96b3 + 328359a commit 3f48621
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ public void onSubscribe(Subscription subscription) {
}
}

@Override
public void request(long numberOfItems) {
// Ignored, we already requested Long.MAX
}

@Override
public void onItem(T item) {
last = item;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.io.IOException;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.BinaryOperator;
import java.util.function.Function;
Expand Down Expand Up @@ -367,6 +368,28 @@ public void testCollectWhenWithFailure() {
.hasMessageContaining("boom");
}

@Test
public void testThatLastEmitASingleRequest() {
AtomicInteger counter = new AtomicInteger();
Multi.createFrom().items("a", "b", "c")
.onRequest().invoke(counter::incrementAndGet)
.collect().last()
.await().indefinitely();

assertThat(counter).hasValue(1);
}

@Test
public void testThatFirstEmitASingleRequest() {
AtomicInteger counter = new AtomicInteger();
Multi.createFrom().items("a", "b", "c")
.onRequest().invoke(counter::incrementAndGet)
.collect().first()
.await().indefinitely();

assertThat(counter).hasValue(1);
}

static class Person {

private final String firstName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@
import java.io.IOException;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
Expand All @@ -25,51 +27,67 @@

public class MultiSelectFirstOrLast {

private AtomicInteger counter;

@BeforeEach
public void init() {
counter = new AtomicInteger();
}

@Test
public void testSelectFirstWithLimit() {
List<Integer> list = Multi.createFrom().range(1, 5).select().first(2)
.collectItems().asList().await().indefinitely();
List<Integer> list = Multi.createFrom().range(1, 5)
.onRequest().invoke(() -> counter.incrementAndGet())
.select().first(2)
.collect().asList().await().indefinitely();

assertThat(list).containsExactly(1, 2);
assertThat(counter).hasValue(1);
}

@Test
public void testSelectFirst() {
List<Integer> list = Multi.createFrom().range(1, 5).select().first(1)
.collectItems().asList().await().indefinitely();
List<Integer> list = Multi.createFrom().range(1, 5)
.onRequest().invoke(() -> counter.incrementAndGet())
.select().first(1)
.collect().asList().await().indefinitely();

assertThat(list).containsExactly(1);
assertThat(counter).hasValue(1);
}

@SuppressWarnings("deprecation")
@Test
public void testSelectFirstDeprecated() {
List<Integer> list = Multi.createFrom().range(1, 5).transform().byTakingFirstItems(2)
.collectItems().asList().await().indefinitely();
.collect().asList().await().indefinitely();

assertThat(list).containsExactly(1, 2);
}

@Test
public void testSelectFirst0() {
List<Integer> list = Multi.createFrom().range(1, 5).select().first(0)
.collectItems().asList().await().indefinitely();
.collect().asList().await().indefinitely();

assertThat(list).isEmpty();
}

@Test
public void testSelectLastWithLimit() {
List<Integer> list = Multi.createFrom().range(1, 5).select().last(2)
.collectItems().asList().await().indefinitely();
List<Integer> list = Multi.createFrom().range(1, 5)
.onRequest().invoke(() -> counter.incrementAndGet())
.select().last(2)
.collect().asList().await().indefinitely();

assertThat(list).containsExactly(3, 4);
assertThat(counter).hasValue(1);
}

@Test
public void testSelectLast() {
List<Integer> list = Multi.createFrom().range(1, 5).select().last()
.collectItems().asList().await().indefinitely();
.collect().asList().await().indefinitely();

assertThat(list).containsExactly(4);
}
Expand All @@ -78,7 +96,7 @@ public void testSelectLast() {
@Test
public void testSelectLastDeprecated() {
List<Integer> list = Multi.createFrom().range(1, 5).transform().byTakingLastItems(2)
.collectItems().asList().await().indefinitely();
.collect().asList().await().indefinitely();

assertThat(list).containsExactly(3, 4);
}
Expand All @@ -87,7 +105,7 @@ public void testSelectLastDeprecated() {
public void testSelectLastWith0() {
List<Integer> list = Multi.createFrom().range(1, 5)
.select().last(0)
.collectItems().asList().await().indefinitely();
.collect().asList().await().indefinitely();

assertThat(list).isEmpty();
}
Expand Down

0 comments on commit 3f48621

Please sign in to comment.