Skip to content

Commit

Permalink
Ensure StepVerifier evaluates signal that exits thenConsumeWhile (#3203)
Browse files Browse the repository at this point in the history
This fixes a situation where the first non-matching onNext signal could
be skipped if the thenConsumeWhile was followed by a `TaskEvent` step.

Fixes #3121.
  • Loading branch information
simonbasle authored Oct 5, 2022
1 parent db8902d commit f04c160
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -1449,7 +1449,8 @@ final void onExpectation(Signal<T> actualSignal) {
return;
}
//possibly re-evaluate the current onNext
event = this.script.peek();
onExpectation(actualSignal);
return;
}

if (event instanceof CollectEvent) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@
package reactor.test;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.Queue;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

Expand All @@ -46,6 +48,91 @@
*/
public class DefaultStepVerifierBuilderTests {

@Test
void consumeWhileFollowedByTaskEvent() {
List<Integer> consumed = new ArrayList<>();

Flux.range(1, 20)
.as(StepVerifier::create)
.thenConsumeWhile(i -> i < 18, consumed::add)
.then(() -> consumed.add(100))
.expectNext(18, 19, 20)
.verifyComplete();

assertThat(consumed)
.containsExactly(1, 2, 3, 4, 5, 6, 7, 8, 9, 10,
11, 12, 13, 14, 15, 16, 17, 100);
}

@Test
void thenConsumeWhile_NoElementsShouldBeConsumed_oneElement() {
Flux<String> given = Flux.just("a");

StepVerifier.create(given)
.thenConsumeWhile(s -> s.equals("42"))
.then(() -> {}) // do nothing
.consumeNextWith(s -> Assertions.assertEquals("a", s))
.verifyComplete();

// expectation "consumeNextWith" failed (expected: onNext(); actual: onComplete())
}

@Test
void thenConsumeWhile_NoElementsShouldBeConsumed_consumeWhileTwice() {
Flux<String> given = Flux.just("a", "a");

StepVerifier.create(given)
.thenConsumeWhile(s -> s.equals("42"))
.then(() -> {})
.thenConsumeWhile(s -> s.equals("42"))
.then(() -> {})
.consumeNextWith(s -> Assertions.assertEquals("a", s))
.consumeNextWith(s -> Assertions.assertEquals("a", s))
.verifyComplete();

// expectation "consumeNextWith" failed (expected: onNext(); actual: onComplete())
}

@Test
void thenConsumeWhile_NoElementsShouldBeConsumed_moreThanOneElement() {
Flux<String> given = Flux.just("a", "b");

StepVerifier.create(given)
.thenConsumeWhile(s -> s.equals("42"))
.then(() -> {}) // do nothing
.consumeNextWith(s -> Assertions.assertEquals("a", s))
.consumeNextWith(s -> Assertions.assertEquals("b", s))
.verifyComplete();

// org.opentest4j.AssertionFailedError: expected: <a> but was: <b>
}

@Test
void thenConsumeWhile_OneElementShouldBeConsumed() {
Flux<String> given = Flux.just("a", "b");

StepVerifier.create(given)
.thenConsumeWhile(s -> s.equals("a"))
.then(() -> {}) // do nothing
.consumeNextWith(s -> Assertions.assertEquals("b", s))
.verifyComplete();

// expectation "consumeNextWith" failed (expected: onNext(); actual: onComplete())
}

@Test
void thenConsumeWhile_onlyTwoElementsShouldBeConsumed() {
Flux<String> given = Flux.just("a", "a", "b");

StepVerifier.create(given)
.thenConsumeWhile(s -> s.equals("a"))
.then(() -> {}) // do nothing
.consumeNextWith(s -> Assertions.assertEquals("b", s))
.verifyComplete();

// expectation "consumeNextWith" failed (expected: onNext(); actual: onComplete())
}

@Test
public void subscribedTwice() {
Flux<String> flux = Flux.just("foo", "bar");
Expand Down

0 comments on commit f04c160

Please sign in to comment.