From a81c239765691e35c4cb732af18e4501de71ff72 Mon Sep 17 00:00:00 2001 From: Ozan Gunalp Date: Thu, 10 Aug 2023 18:14:41 +0300 Subject: [PATCH] ConcatMap flaky test : Change how the upstream request is deferred until the next downstream request --- .../operators/multi/MultiConcatMapOp.java | 39 +++++++++---------- .../multi/MultiConcatMapNoPrefetchTest.java | 7 ++-- 2 files changed, 21 insertions(+), 25 deletions(-) diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiConcatMapOp.java b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiConcatMapOp.java index 3f62fae2a..28736c5c2 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiConcatMapOp.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiConcatMapOp.java @@ -2,6 +2,7 @@ import java.util.concurrent.Flow.Publisher; import java.util.concurrent.Flow.Subscription; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; @@ -78,6 +79,8 @@ public static final class ConcatMapMainSubscriber implements MultiSubscrib final ConcatMapInner inner; + private final AtomicBoolean deferredUpstreamRequest = new AtomicBoolean(false); + ConcatMapMainSubscriber( MultiSubscriber downstream, Function> mapper, @@ -93,13 +96,16 @@ public void request(long n) { if (n > 0) { if (state.compareAndSet(STATE_NEW, STATE_READY)) { upstream.request(1); - // No outstanding requests from inner, forward the request to upstream - } else if (state.get() == STATE_READY && inner.requested() == 0) { + } + if (deferredUpstreamRequest.compareAndSet(true, false)) { upstream.request(1); } inner.request(n); + if (inner.requested() != 0L && deferredUpstreamRequest.compareAndSet(true, false)) { + upstream.request(1); + } } else { - downstream.onFailure(new IllegalArgumentException("Invalid requests, must be greater than 0")); + downstream.onFailure(Subscriptions.getInvalidRequestException()); } } @@ -187,26 +193,17 @@ public synchronized void tryEmit(O value) { } public void innerComplete(long emitted) { - while (true) { - int state = this.state.get(); - if (state == STATE_EMITTING) { - if (this.state.compareAndSet(state, STATE_READY)) { - // Inner completed but there are outstanding requests from inner, - // Or the inner completed without producing any items - // Request new item from upstream - if (inner.requested() != 0L || emitted == 0) { - upstream.request(1); - } - return; - } - } else if (state == STATE_OUTER_TERMINATED) { - if (this.state.compareAndSet(state, STATE_TERMINATED)) { - terminateDownstream(); - return; - } + if (this.state.compareAndSet(STATE_EMITTING, STATE_READY)) { + // Inner completed but there are outstanding requests from inner, + // Or the inner completed without producing any items + // Request new item from upstream + if (inner.requested() != 0L || emitted == 0) { + upstream.request(1); } else { - return; + deferredUpstreamRequest.set(true); } + } else if (this.state.compareAndSet(STATE_OUTER_TERMINATED, STATE_TERMINATED)) { + terminateDownstream(); } } diff --git a/implementation/src/test/java/io/smallrye/mutiny/operators/multi/MultiConcatMapNoPrefetchTest.java b/implementation/src/test/java/io/smallrye/mutiny/operators/multi/MultiConcatMapNoPrefetchTest.java index 09badcc10..63304632e 100644 --- a/implementation/src/test/java/io/smallrye/mutiny/operators/multi/MultiConcatMapNoPrefetchTest.java +++ b/implementation/src/test/java/io/smallrye/mutiny/operators/multi/MultiConcatMapNoPrefetchTest.java @@ -10,7 +10,6 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.condition.DisabledIfEnvironmentVariable; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; @@ -22,7 +21,6 @@ import io.smallrye.mutiny.helpers.test.AssertSubscriber; import io.smallrye.mutiny.infrastructure.Infrastructure; -@DisabledIfEnvironmentVariable(named = "CI", matches = "true") // TODO Disabled in CI until https://github.com/smallrye/smallrye-mutiny/issues/1254 has been fixed class MultiConcatMapNoPrefetchTest { AtomicInteger upstreamRequestCount; @@ -70,9 +68,9 @@ void testTransformToMulti(boolean prefetch, int[] upstreamRequests) { Multi result = upstream.onItem() .transformToMulti(i -> Multi.createFrom().items(i, i)) .concatenate(prefetch); - AssertSubscriber ts = new AssertSubscriber<>(5); + AssertSubscriber ts = new AssertSubscriber<>(); result.runSubscriptionOn(Infrastructure.getDefaultExecutor()).subscribe(ts); - ts.request(5); + ts.request(10); ts.awaitItems(10); assertThat(upstreamRequestCount).hasValue(upstreamRequests[0]); ts.request(1); @@ -137,6 +135,7 @@ void testNoPrefetchWithConcatMapContainingEmpty() { .concatenate(); AssertSubscriber ts = new AssertSubscriber<>(5); result.runSubscriptionOn(Infrastructure.getDefaultExecutor()).subscribe(ts); + ts.awaitSubscription(); ts.request(5); ts.awaitItems(10); assertThat(upstreamRequestCount).hasValueGreaterThan(10);