From c9d3468a143c9da516dfee3c1796c5bb0278e77c Mon Sep 17 00:00:00 2001 From: Rajini Sivaram Date: Wed, 26 Jul 2017 15:36:29 +0100 Subject: [PATCH 1/2] fix #730 Use Flux instead of GroupedFlux for FluxWindowPredicate --- .../java/reactor/core/publisher/Flux.java | 30 +++--- .../core/publisher/FluxWindowPredicate.java | 96 +++++++++---------- .../publisher/FluxWindowPredicateTest.java | 75 ++------------- .../scenarios/FluxWindowConsistencyTest.java | 32 +++---- 4 files changed, 84 insertions(+), 149 deletions(-) diff --git a/reactor-core/src/main/java/reactor/core/publisher/Flux.java b/reactor-core/src/main/java/reactor/core/publisher/Flux.java index a0f896da46..fc954612ed 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/Flux.java +++ b/reactor-core/src/main/java/reactor/core/publisher/Flux.java @@ -7144,10 +7144,10 @@ public final Flux> windowTimeout(int maxSize, Duration timespan, Schedul * * * @param boundaryTrigger a predicate that triggers the next window when it becomes true. - * @return a {@link Flux} of {@link GroupedFlux} windows, bounded depending - * on the predicate and keyed with the value that triggered the new window. + * @return a {@link Flux} of {@link Flux} windows, bounded depending + * on the predicate. */ - public final Flux> windowUntil(Predicate boundaryTrigger) { + public final Flux> windowUntil(Predicate boundaryTrigger) { return windowUntil(boundaryTrigger, false); } @@ -7169,10 +7169,10 @@ public final Flux> windowUntil(Predicate boundaryTrigger) { * * @param boundaryTrigger a predicate that triggers the next window when it becomes true. * @param cutBefore push to true to include the triggering element in the new window rather than the old. - * @return a {@link Flux} of {@link GroupedFlux} windows, bounded depending - * on the predicate and keyed with the value that triggered the new window. + * @return a {@link Flux} of {@link Flux} windows, bounded depending + * on the predicate. */ - public final Flux> windowUntil(Predicate boundaryTrigger, boolean cutBefore) { + public final Flux> windowUntil(Predicate boundaryTrigger, boolean cutBefore) { return windowUntil(boundaryTrigger, cutBefore, Queues.SMALL_BUFFER_SIZE); } @@ -7196,10 +7196,10 @@ public final Flux> windowUntil(Predicate boundaryTrigger, b * @param boundaryTrigger a predicate that triggers the next window when it becomes true. * @param cutBefore push to true to include the triggering element in the new window rather than the old. * @param prefetch the request size to use for this {@link Flux}. - * @return a {@link Flux} of {@link GroupedFlux} windows, bounded depending - * on the predicate and keyed with the value that triggered the new window. + * @return a {@link Flux} of {@link Flux} windows, bounded depending + * on the predicate. */ - public final Flux> windowUntil(Predicate boundaryTrigger, boolean cutBefore, int prefetch) { + public final Flux> windowUntil(Predicate boundaryTrigger, boolean cutBefore, int prefetch) { return onAssembly(new FluxWindowPredicate<>(this, Queues.unbounded(prefetch), Queues.unbounded(prefetch), @@ -7220,10 +7220,10 @@ public final Flux> windowUntil(Predicate boundaryTrigger, b * * * @param inclusionPredicate a predicate that triggers the next window when it becomes false. - * @return a {@link Flux} of {@link GroupedFlux} windows, each containing - * subsequent elements that all passed a predicate, and keyed with a separator element. + * @return a {@link Flux} of {@link Flux} windows, each containing + * subsequent elements that all passed a predicate. */ - public final Flux> windowWhile(Predicate inclusionPredicate) { + public final Flux> windowWhile(Predicate inclusionPredicate) { return windowWhile(inclusionPredicate, Queues.SMALL_BUFFER_SIZE); } @@ -7240,10 +7240,10 @@ public final Flux> windowWhile(Predicate inclusionPredicate * * @param inclusionPredicate a predicate that triggers the next window when it becomes false. * @param prefetch the request size to use for this {@link Flux}. - * @return a {@link Flux} of {@link GroupedFlux} windows, each containing - * subsequent elements that all passed a predicate, and keyed with a separator element. + * @return a {@link Flux} of {@link Flux} windows, each containing + * subsequent elements that all passed a predicate. */ - public final Flux> windowWhile(Predicate inclusionPredicate, int prefetch) { + public final Flux> windowWhile(Predicate inclusionPredicate, int prefetch) { return onAssembly(new FluxWindowPredicate<>(this, Queues.unbounded(prefetch), Queues.unbounded(prefetch), diff --git a/reactor-core/src/main/java/reactor/core/publisher/FluxWindowPredicate.java b/reactor-core/src/main/java/reactor/core/publisher/FluxWindowPredicate.java index 75a6330557..2251c036cd 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/FluxWindowPredicate.java +++ b/reactor-core/src/main/java/reactor/core/publisher/FluxWindowPredicate.java @@ -51,12 +51,12 @@ * * @see Reactive-Streams-Commons */ -final class FluxWindowPredicate extends FluxOperator> +final class FluxWindowPredicate extends FluxOperator> implements Fuseable{ final Supplier> groupQueueSupplier; - final Supplier>> mainQueueSupplier; + final Supplier>> mainQueueSupplier; final Mode mode; @@ -65,7 +65,7 @@ final class FluxWindowPredicate extends FluxOperator> final int prefetch; FluxWindowPredicate(Flux source, - Supplier>> mainQueueSupplier, + Supplier>> mainQueueSupplier, Supplier> groupQueueSupplier, int prefetch, Predicate predicate, @@ -84,7 +84,7 @@ final class FluxWindowPredicate extends FluxOperator> } @Override - public void subscribe(CoreSubscriber> s) { + public void subscribe(CoreSubscriber> s) { source.subscribe(new WindowPredicateMain<>(s, mainQueueSupplier.get(), groupQueueSupplier, @@ -99,10 +99,10 @@ public int getPrefetch() { } static final class WindowPredicateMain - implements Fuseable.QueueSubscription>, - InnerOperator> { + implements Fuseable.QueueSubscription>, + InnerOperator> { - final CoreSubscriber> actual; + final CoreSubscriber> actual; final Supplier> groupQueueSupplier; @@ -112,9 +112,9 @@ static final class WindowPredicateMain final int prefetch; - final Queue> queue; + final Queue> queue; - WindowGroupedFlux window; + WindowFlux window; volatile int wip; @SuppressWarnings("rawtypes") @@ -149,8 +149,8 @@ static final class WindowPredicateMain volatile boolean outputFused; - WindowPredicateMain(CoreSubscriber> actual, - Queue> queue, + WindowPredicateMain(CoreSubscriber> actual, + Queue> queue, Supplier> groupQueueSupplier, int prefetch, Predicate predicate, @@ -182,19 +182,19 @@ public void onSubscribe(Subscription s) { } void initializeWindow() { - WindowGroupedFlux g = new WindowGroupedFlux<>(null, + WindowFlux g = new WindowFlux<>( groupQueueSupplier.get(), this); window = g; queue.offer(g); } - void offerNewWindow(T key, @Nullable T emitInNewWindow) { + void offerNewWindow(@Nullable T emitInNewWindow) { // if the main is cancelled, don't create new groups if (cancelled == 0) { WINDOW_COUNT.getAndIncrement(this); - WindowGroupedFlux g = new WindowGroupedFlux<>(key, + WindowFlux g = new WindowFlux<>( groupQueueSupplier.get(), this); if (emitInNewWindow != null) { g.onNext(emitInNewWindow); @@ -215,7 +215,7 @@ public void onNext(T t) { Operators.onNextDropped(t); return; } - WindowGroupedFlux g = window; + WindowFlux g = window; boolean match; try { @@ -229,15 +229,15 @@ public void onNext(T t) { if (mode == Mode.UNTIL && match) { g.onNext(t); g.onComplete(); - offerNewWindow(t, null); + offerNewWindow(null); } else if (mode == Mode.UNTIL_CUT_BEFORE && match) { g.onComplete(); - offerNewWindow(t, t); + offerNewWindow(t); } else if (mode == Mode.WHILE && !match) { g.onComplete(); - offerNewWindow(t, null); + offerNewWindow(null); //compensate for the dropped delimiter s.request(1); } @@ -263,7 +263,7 @@ public void onComplete() { return; } - WindowGroupedFlux g = window; + WindowFlux g = window; if (g != null) { g.onComplete(); } @@ -293,14 +293,14 @@ public Stream inners() { } @Override - public CoreSubscriber> actual() { + public CoreSubscriber> actual() { return actual; } void signalAsyncError() { Throwable e = Exceptions.terminate(ERROR, this); windowCount = 0; - WindowGroupedFlux g = window; + WindowFlux g = window; if (g != null) { g.onError(e); } @@ -325,9 +325,9 @@ public void cancel() { else if (!outputFused) { if (WIP.getAndIncrement(this) == 0) { // remove queued up but unobservable groups from the mapping - GroupedFlux g; + Flux g; while ((g = queue.poll()) != null) { - ((WindowGroupedFlux) g).cancel(); + ((WindowFlux) g).cancel(); } if (WIP.decrementAndGet(this) == 0) { @@ -366,8 +366,8 @@ void drain() { void drainFused() { int missed = 1; - final Subscriber> a = actual; - final Queue> q = queue; + final Subscriber> a = actual; + final Queue> q = queue; for (; ; ) { @@ -402,8 +402,8 @@ void drainLoop() { int missed = 1; - Subscriber> a = actual; - Queue> q = queue; + Subscriber> a = actual; + Queue> q = queue; for (; ; ) { @@ -412,7 +412,7 @@ void drainLoop() { while (e != r) { boolean d = done; - GroupedFlux v = q.poll(); + Flux v = q.poll(); boolean empty = v == null; if (checkTerminated(d, empty, a, q)) { @@ -453,7 +453,7 @@ void drainLoop() { boolean checkTerminated(boolean d, boolean empty, Subscriber a, - Queue> q) { + Queue> q) { if (cancelled != 0) { q.clear(); @@ -477,7 +477,7 @@ else if (empty) { @Override @Nullable - public GroupedFlux poll() { + public Flux poll() { return queue.poll(); } @@ -506,23 +506,15 @@ public int requestFusion(int requestedMode) { } } - static final class WindowGroupedFlux extends GroupedFlux + static final class WindowFlux extends Flux implements Fuseable, Fuseable.QueueSubscription, InnerOperator { - final T key; - - @Override - @Nullable - public T key() { - return key; - } - final Queue queue; volatile WindowPredicateMain parent; @SuppressWarnings("rawtypes") - static final AtomicReferenceFieldUpdater - PARENT = AtomicReferenceFieldUpdater.newUpdater(WindowGroupedFlux.class, + static final AtomicReferenceFieldUpdater + PARENT = AtomicReferenceFieldUpdater.newUpdater(WindowFlux.class, WindowPredicateMain.class, "parent"); @@ -531,8 +523,8 @@ public T key() { volatile CoreSubscriber actual; @SuppressWarnings("rawtypes") - static final AtomicReferenceFieldUpdater ACTUAL = - AtomicReferenceFieldUpdater.newUpdater(WindowGroupedFlux.class, + static final AtomicReferenceFieldUpdater ACTUAL = + AtomicReferenceFieldUpdater.newUpdater(WindowFlux.class, CoreSubscriber.class, "actual"); @@ -540,28 +532,26 @@ public T key() { volatile int once; @SuppressWarnings("rawtypes") - static final AtomicIntegerFieldUpdater ONCE = - AtomicIntegerFieldUpdater.newUpdater(WindowGroupedFlux.class, "once"); + static final AtomicIntegerFieldUpdater ONCE = + AtomicIntegerFieldUpdater.newUpdater(WindowFlux.class, "once"); volatile int wip; @SuppressWarnings("rawtypes") - static final AtomicIntegerFieldUpdater WIP = - AtomicIntegerFieldUpdater.newUpdater(WindowGroupedFlux.class, "wip"); + static final AtomicIntegerFieldUpdater WIP = + AtomicIntegerFieldUpdater.newUpdater(WindowFlux.class, "wip"); volatile long requested; @SuppressWarnings("rawtypes") - static final AtomicLongFieldUpdater REQUESTED = - AtomicLongFieldUpdater.newUpdater(WindowGroupedFlux.class, "requested"); + static final AtomicLongFieldUpdater REQUESTED = + AtomicLongFieldUpdater.newUpdater(WindowFlux.class, "requested"); volatile boolean enableOperatorFusion; int produced; - WindowGroupedFlux( - @Nullable T key, + WindowFlux( Queue queue, WindowPredicateMain parent) { - this.key = key; this.queue = queue; this.parent = parent; } @@ -847,7 +837,7 @@ public Object scanUnsafe(Attr key) { @Override public String toString() { - return "WindowGroupedFlux[" + key + "]"; + return "WindowFlux"; } } diff --git a/reactor-core/src/test/java/reactor/core/publisher/FluxWindowPredicateTest.java b/reactor-core/src/test/java/reactor/core/publisher/FluxWindowPredicateTest.java index 4eaacceec9..67191ebbb1 100644 --- a/reactor-core/src/test/java/reactor/core/publisher/FluxWindowPredicateTest.java +++ b/reactor-core/src/test/java/reactor/core/publisher/FluxWindowPredicateTest.java @@ -17,7 +17,6 @@ package reactor.core.publisher; import java.time.Duration; -import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.concurrent.atomic.AtomicLong; @@ -40,10 +39,10 @@ import static org.assertj.core.api.Assertions.assertThat; public class FluxWindowPredicateTest extends - FluxOperatorTest> { + FluxOperatorTest> { @Override - protected Scenario> defaultScenarioOptions(Scenario> defaultOptions) { + protected Scenario> defaultScenarioOptions(Scenario> defaultOptions) { return defaultOptions.shouldAssertPostTerminateState(false) .fusionMode(Fuseable.ASYNC) .fusionModeThreadBarrier(Fuseable.ANY) @@ -51,7 +50,7 @@ protected Scenario> defaultScenarioOptions(S } @Override - protected List>> scenarios_operatorSuccess() { + protected List>> scenarios_operatorSuccess() { return Arrays.asList( scenario(f -> f.windowUntil(t -> true, true, 1)) .prefetch(1) @@ -84,7 +83,7 @@ protected List>> scenarios_operator } @Override - protected List>> scenarios_operatorError() { + protected List>> scenarios_operatorError() { return Arrays.asList( scenario(f -> f.windowWhile(t -> { throw exception(); @@ -104,7 +103,7 @@ protected List>> scenarios_operator } @Override - protected List>> scenarios_errorFromUpstreamFailure() { + protected List>> scenarios_errorFromUpstreamFailure() { return Arrays.asList( scenario(f -> f.windowWhile(t -> true)) .receive(s -> s.buffer().subscribe(null, e -> assertThat(e).hasMessage("test"))), @@ -166,7 +165,6 @@ public void apiWhile() { } @Test - @SuppressWarnings("unchecked") public void normalUntil() { DirectProcessor sp1 = DirectProcessor.create(); FluxWindowPredicate windowUntil = new FluxWindowPredicate<>(sp1, @@ -202,7 +200,6 @@ public void normalUntil() { } @Test - @SuppressWarnings("unchecked") public void onCompletionBeforeLastBoundaryWindowEmitted() { Flux source = Flux.just(1, 2); @@ -235,7 +232,6 @@ public void onCompletionBeforeLastBoundaryWindowEmitted() { } @Test - @SuppressWarnings("unchecked") public void mainErrorUntilIsPropagatedToBothWindowAndMain() { DirectProcessor sp1 = DirectProcessor.create(); FluxWindowPredicate windowUntil = new FluxWindowPredicate<>( @@ -262,7 +258,6 @@ public void mainErrorUntilIsPropagatedToBothWindowAndMain() { } @Test - @SuppressWarnings("unchecked") public void predicateErrorUntil() { DirectProcessor sp1 = DirectProcessor.create(); FluxWindowPredicate windowUntil = new FluxWindowPredicate<>( @@ -291,7 +286,6 @@ public void predicateErrorUntil() { } @Test - @SuppressWarnings("unchecked") public void normalUntilCutBefore() { DirectProcessor sp1 = DirectProcessor.create(); FluxWindowPredicate windowUntilCutBefore = new FluxWindowPredicate<>(sp1, @@ -323,7 +317,6 @@ public void normalUntilCutBefore() { } @Test - @SuppressWarnings("unchecked") public void mainErrorUntilCutBeforeIsPropagatedToBothWindowAndMain() { DirectProcessor sp1 = DirectProcessor.create(); FluxWindowPredicate windowUntilCutBefore = @@ -351,7 +344,6 @@ public void mainErrorUntilCutBeforeIsPropagatedToBothWindowAndMain() { } @Test - @SuppressWarnings("unchecked") public void predicateErrorUntilCutBefore() { DirectProcessor sp1 = DirectProcessor.create(); FluxWindowPredicate windowUntilCutBefore = @@ -386,7 +378,6 @@ private Predicate> signalErrorMessage(String expectedMessa } @Test - @SuppressWarnings("unchecked") public void normalWhile() { DirectProcessor sp1 = DirectProcessor.create(); FluxWindowPredicate windowWhile = new FluxWindowPredicate<>( @@ -418,7 +409,6 @@ public void normalWhile() { } @Test - @SuppressWarnings("unchecked") public void normalWhileDoesntInitiallyMatch() { DirectProcessor sp1 = DirectProcessor.create(); FluxWindowPredicate windowWhile = new FluxWindowPredicate<>( @@ -457,7 +447,6 @@ public void normalWhileDoesntInitiallyMatch() { } @Test - @SuppressWarnings("unchecked") public void normalWhileDoesntMatch() { DirectProcessor sp1 = DirectProcessor.create(); FluxWindowPredicate windowWhile = new FluxWindowPredicate<>( @@ -493,7 +482,6 @@ public void normalWhileDoesntMatch() { } @Test - @SuppressWarnings("unchecked") public void mainErrorWhileIsPropagatedToBothWindowAndMain() { DirectProcessor sp1 = DirectProcessor.create(); FluxWindowPredicate windowWhile = new FluxWindowPredicate<>( @@ -543,7 +531,6 @@ public void whileOnlySeparatorsGivesSequenceOfWindows() { } @Test - @SuppressWarnings("unchecked") public void predicateErrorWhile() { DirectProcessor sp1 = DirectProcessor.create(); FluxWindowPredicate windowWhile = new FluxWindowPredicate<>( @@ -594,48 +581,6 @@ public void whileRequestOneByOne() { .verifyComplete(); } - @Test - public void groupsHaveCorrectKeysWhile() { - List keys = new ArrayList<>(10); - - StepVerifier.create(Flux.just("red", "green", "#1", "orange", "blue", "#2", "black", "white") - .windowWhile(color -> !color.startsWith("#")) - .doOnNext(w -> keys.add(w.key())) - .flatMap(w -> w)) - .expectNext("red", "green", "orange", "blue", "black", "white") - .verifyComplete(); - - assertThat(keys).containsExactly(null, "#1", "#2"); - } - - @Test - public void groupsHaveCorrectKeysUntil() { - List keys = new ArrayList<>(10); - - StepVerifier.create(Flux.just("red", "green", "#1", "orange", "blue", "#2", "black", "white") - .windowUntil(color -> color.startsWith("#")) - .doOnNext(w -> keys.add(w.key())) - .flatMap(w -> w)) - .expectNext("red", "green", "#1", "orange", "blue", "#2", "black", "white") - .verifyComplete(); - - assertThat(keys).containsExactly(null, "#1", "#2"); - } - - @Test - public void groupsHaveCorrectKeysUntilCutBefore() { - List keys = new ArrayList<>(10); - - StepVerifier.create(Flux.just("red", "green", "#1", "orange", "blue", "#2", "black", "white") - .windowUntil(color -> color.startsWith("#"), true) - .doOnNext(w -> keys.add(w.key())) - .flatMap(w -> w)) - .expectNext("red", "green", "#1", "orange", "blue", "#2", "black", "white") - .verifyComplete(); - - assertThat(keys).containsExactly(null, "#1", "#2"); - } - @Test public void mismatchAtBeginningUntil() { StepVerifier.create(Flux.just("#", "red", "green") @@ -827,9 +772,9 @@ public void windowUntilUnboundedStartingDelimiterReplenishes() { @Test public void scanMainSubscriber() { - CoreSubscriber> actual = new LambdaSubscriber<>(null, e -> {}, null, null); + CoreSubscriber> actual = new LambdaSubscriber<>(null, e -> {}, null, null); FluxWindowPredicate.WindowPredicateMain test = new FluxWindowPredicate.WindowPredicateMain<>(actual, - Queues.>unbounded().get(), Queues.unbounded(), 123, i -> true, Mode.WHILE); + Queues.>unbounded().get(), Queues.unbounded(), 123, i -> true, Mode.WHILE); Subscription parent = Operators.emptySubscription(); test.onSubscribe(parent); @@ -859,10 +804,10 @@ public void scanMainSubscriber() { @Test public void scanOtherSubscriber() { - CoreSubscriber> actual = new LambdaSubscriber<>(null, e -> {}, null, null); + CoreSubscriber> actual = new LambdaSubscriber<>(null, e -> {}, null, null); FluxWindowPredicate.WindowPredicateMain main = new FluxWindowPredicate.WindowPredicateMain<>(actual, - Queues.>unbounded().get(), Queues.unbounded(), 123, i -> true, Mode.WHILE); - FluxWindowPredicate.WindowGroupedFlux test = new FluxWindowPredicate.WindowGroupedFlux<>(1, + Queues.>unbounded().get(), Queues.unbounded(), 123, i -> true, Mode.WHILE); + FluxWindowPredicate.WindowFlux test = new FluxWindowPredicate.WindowFlux<>( Queues.unbounded().get(), main); Subscription parent = Operators.emptySubscription(); diff --git a/reactor-core/src/test/java/reactor/core/publisher/scenarios/FluxWindowConsistencyTest.java b/reactor-core/src/test/java/reactor/core/publisher/scenarios/FluxWindowConsistencyTest.java index 968419ea2a..45b216b2a3 100644 --- a/reactor-core/src/test/java/reactor/core/publisher/scenarios/FluxWindowConsistencyTest.java +++ b/reactor-core/src/test/java/reactor/core/publisher/scenarios/FluxWindowConsistencyTest.java @@ -233,16 +233,16 @@ public void windowStartEndComplete() throws Exception { @Test public void windowUntilComplete() throws Exception { - Flux> windows = source.windowUntil(i -> i % 3 == 0); - subscribeGroups(windows); + Flux> windows = source.windowUntil(i -> i % 3 == 0); + subscribe(windows); generateAndComplete(1, 5); verifyMainComplete(Arrays.asList(1, 2, 3), Arrays.asList(4, 5)); } @Test public void windowWhileComplete() throws Exception { - Flux> windows = source.windowWhile(i -> i % 3 != 0); - subscribeGroups(windows); + Flux> windows = source.windowWhile(i -> i % 3 != 0); + subscribe(windows); generateAndComplete(1, 5); verifyMainComplete(Arrays.asList(1, 2), Arrays.asList(4, 5)); } @@ -341,16 +341,16 @@ public void windowStartEndMainCancel() throws Exception { @Test public void windowUntilMainCancel() throws Exception { - Flux> windows = source.windowUntil(i -> i % 3 == 0); - subscribeGroups(windows); + Flux> windows = source.windowUntil(i -> i % 3 == 0); + subscribe(windows); generateWithCancel(1, 4, 10); verifyMainCancel(true, Arrays.asList(1, 2, 3), Arrays.asList(4, 5, 6)); } @Test public void windowWhileMainCancel() throws Exception { - Flux> windows = source.windowWhile(i -> i % 3 != 0); - subscribeGroups(windows); + Flux> windows = source.windowWhile(i -> i % 3 != 0); + subscribe(windows); generateWithCancel(1, 4, 10); verifyMainCancel(true, Arrays.asList(1, 2), Arrays.asList(4, 5)); } @@ -442,16 +442,16 @@ public void windowStartEndMainCancelNoNewWindow() throws Exception { @Test public void windowUntilMainCancelNoNewWindow() throws Exception { - Flux> windows = source.windowUntil(i -> i % 3 == 0); - subscribeGroups(windows); + Flux> windows = source.windowUntil(i -> i % 3 == 0); + subscribe(windows); generateWithCancel(0, 4, 1); verifyMainCancelNoNewWindow(2, Arrays.asList(0), Arrays.asList(1, 2, 3)); } @Test public void windowWhileMainCancelNoNewWindow() throws Exception { - Flux> windows = source.windowWhile(i -> i % 3 != 1); - subscribeGroups(windows); + Flux> windows = source.windowWhile(i -> i % 3 != 1); + subscribe(windows); generateWithCancel(0, 4, 1); verifyMainCancelNoNewWindow(2, Arrays.asList(0), Arrays.asList(2, 3)); } @@ -527,16 +527,16 @@ public void windowStartEndInnerCancel() throws Exception { @Test public void windowUntilInnerCancel() throws Exception { - Flux> windows = source.windowUntil(i -> i % 3 == 0); - subscribeGroups(windows); + Flux> windows = source.windowUntil(i -> i % 3 == 0); + subscribe(windows); generateWithCancel(0, 6, 1); verifyInnerCancel(1, i -> i != 3, Arrays.asList(0), Arrays.asList(1, 2)); } @Test public void windowWhileInnerCancel() throws Exception { - Flux> windows = source.windowWhile(i -> i % 3 != 1); - subscribeGroups(windows); + Flux> windows = source.windowWhile(i -> i % 3 != 1); + subscribe(windows); generateWithCancel(0, 6, 1); verifyInnerCancel(1, i -> i != 3, Arrays.asList(0), Arrays.asList(2)); } From e6a22534d2d4a80d73cb79b18a65bf5c922fc9b7 Mon Sep 17 00:00:00 2001 From: Rajini Sivaram Date: Wed, 26 Jul 2017 20:00:41 +0100 Subject: [PATCH 2/2] Remove unused toString --- .../java/reactor/core/publisher/FluxWindowPredicate.java | 5 ----- 1 file changed, 5 deletions(-) diff --git a/reactor-core/src/main/java/reactor/core/publisher/FluxWindowPredicate.java b/reactor-core/src/main/java/reactor/core/publisher/FluxWindowPredicate.java index 2251c036cd..bd4a32c177 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/FluxWindowPredicate.java +++ b/reactor-core/src/main/java/reactor/core/publisher/FluxWindowPredicate.java @@ -834,11 +834,6 @@ public Object scanUnsafe(Attr key) { return InnerOperator.super.scanUnsafe(key); } - - @Override - public String toString() { - return "WindowFlux"; - } } }