diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiConcatMapOp.java b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiConcatMapOp.java
index ef925be53..eb23519a9 100644
--- a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiConcatMapOp.java
+++ b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiConcatMapOp.java
@@ -2,17 +2,20 @@
import java.util.concurrent.Flow.Publisher;
import java.util.concurrent.Flow.Subscription;
-import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Function;
-import io.smallrye.mutiny.CompositeException;
import io.smallrye.mutiny.Context;
import io.smallrye.mutiny.Multi;
+import io.smallrye.mutiny.helpers.ParameterValidation;
import io.smallrye.mutiny.helpers.Subscriptions;
import io.smallrye.mutiny.infrastructure.Infrastructure;
import io.smallrye.mutiny.subscription.ContextSupport;
import io.smallrye.mutiny.subscription.MultiSubscriber;
+import io.smallrye.mutiny.subscription.SwitchableSubscriptionSubscriber;
/**
* ConcatMap operator without prefetching items from the upstream.
@@ -22,7 +25,7 @@
*
The inner has no more outstanding requests.
* The inner completed without emitting items or with outstanding requests.
*
- *
+ *
* This operator can collect failures and postpone them until termination.
*
* @param the upstream value type / input type
@@ -47,225 +50,265 @@ public void subscribe(MultiSubscriber super O> subscriber) {
if (subscriber == null) {
throw new NullPointerException("The subscriber must not be `null`");
}
- ConcatMapSubscriber concatMapSubscriber = new ConcatMapSubscriber<>(mapper, postponeFailurePropagation,
- subscriber);
- upstream.subscribe(Infrastructure.onMultiSubscription(upstream, concatMapSubscriber));
+ ConcatMapMainSubscriber sub = new ConcatMapMainSubscriber<>(subscriber,
+ mapper,
+ postponeFailurePropagation);
+
+ upstream.subscribe(Infrastructure.onMultiSubscription(upstream, sub));
}
- static class ConcatMapSubscriber implements MultiSubscriber, Subscription, ContextSupport {
+ public static final class ConcatMapMainSubscriber implements MultiSubscriber, Subscription, ContextSupport {
- private enum State {
- INIT,
- WAITING_NEXT_PUBLISHER,
- WAITING_NEXT_SUBSCRIPTION,
- EMITTING,
- CANCELLED,
- }
+ private static final int STATE_NEW = 0; // no request yet -- send first upstream request at this state
+ private static final int STATE_READY = 1; // first upstream request done, ready to receive items
+ private static final int STATE_EMITTING = 2; // received item from the upstream, subscribed to the inner
+ private static final int STATE_OUTER_TERMINATED = 3; // outer terminated, waiting for the inner to terminate
+ private static final int STATE_TERMINATED = 4; // inner and outer terminated
+ private static final int STATE_CANCELLED = 5; // cancelled
+ final AtomicInteger state = new AtomicInteger(STATE_NEW);
+
+ final MultiSubscriber super O> downstream;
+ final Function super I, ? extends Publisher extends O>> mapper;
+ private final boolean delayError;
+
+ final AtomicReference failures = new AtomicReference<>();
+
+ volatile Subscription upstream = null;
+ private static final AtomicReferenceFieldUpdater UPSTREAM_UPDATER = AtomicReferenceFieldUpdater
+ .newUpdater(ConcatMapMainSubscriber.class, Subscription.class, "upstream");
+
+ final ConcatMapInner inner;
- private final Function super I, ? extends Publisher extends O>> mapper;
- private final boolean postponeFailurePropagation;
- private final MultiSubscriber super O> downstream;
- private volatile long demand = 0L;
- private volatile State state = State.INIT;
- private volatile Subscription upstream;
- private Subscription currentUpstream;
- private boolean upstreamHasCompleted = false;
- private Throwable failure;
-
- private static final AtomicReferenceFieldUpdater UPSTREAM_UPDATER = AtomicReferenceFieldUpdater
- .newUpdater(ConcatMapSubscriber.class, Subscription.class, "upstream");
- private static final AtomicReferenceFieldUpdater STATE_UPDATER = AtomicReferenceFieldUpdater
- .newUpdater(ConcatMapSubscriber.class, State.class, "state");
- private static final AtomicLongFieldUpdater DEMAND_UPDATER = AtomicLongFieldUpdater
- .newUpdater(ConcatMapSubscriber.class, "demand");
-
- ConcatMapSubscriber(Function super I, ? extends Publisher extends O>> mapper, boolean postponeFailurePropagation,
- MultiSubscriber super O> downstream) {
+ private final AtomicBoolean deferredUpstreamRequest = new AtomicBoolean(false);
+
+ ConcatMapMainSubscriber(
+ MultiSubscriber super O> downstream,
+ Function super I, ? extends Publisher extends O>> mapper,
+ boolean delayError) {
this.downstream = downstream;
this.mapper = mapper;
- this.postponeFailurePropagation = postponeFailurePropagation;
+ this.delayError = delayError;
+ this.inner = new ConcatMapInner<>(this);
}
@Override
- public Context context() {
- if (downstream instanceof ContextSupport) {
- return ((ContextSupport) downstream).context();
+ public void request(long n) {
+ if (n > 0) {
+ if (state.compareAndSet(STATE_NEW, STATE_READY)) {
+ upstream.request(1);
+ }
+ if (deferredUpstreamRequest.compareAndSet(true, false)) {
+ upstream.request(1);
+ }
+ inner.request(n);
+ if (inner.requested() != 0L && deferredUpstreamRequest.compareAndSet(true, false)) {
+ upstream.request(1);
+ }
} else {
- return Context.empty();
+ downstream.onFailure(Subscriptions.getInvalidRequestException());
}
}
- private final MultiSubscriber innerSubscriber = new InnerSubscriber();
+ @Override
+ public void cancel() {
+ while (true) {
+ int state = this.state.get();
+ if (state == STATE_CANCELLED) {
+ return;
+ }
+ if (this.state.compareAndSet(state, STATE_CANCELLED)) {
+ if (state == STATE_OUTER_TERMINATED) {
+ inner.cancel();
+ } else {
+ inner.cancel();
+ upstream.cancel();
+ }
+ return;
+ }
+ }
+ }
@Override
public void onSubscribe(Subscription subscription) {
if (UPSTREAM_UPDATER.compareAndSet(this, null, subscription)) {
downstream.onSubscribe(this);
- } else {
- subscription.cancel();
}
}
@Override
public void onItem(I item) {
- if (state == State.CANCELLED) {
+ if (!state.compareAndSet(STATE_READY, STATE_EMITTING)) {
return;
}
- if (STATE_UPDATER.compareAndSet(this, State.WAITING_NEXT_PUBLISHER, State.WAITING_NEXT_SUBSCRIPTION)) {
- try {
- Publisher extends O> publisher = mapper.apply(item);
- if (publisher == null) {
- throw new NullPointerException("The mapper produced a null publisher");
- }
- publisher.subscribe(innerSubscriber);
- } catch (Throwable err) {
- upstream.cancel();
- onFailure(err);
+
+ try {
+ Publisher extends O> p = mapper.apply(item);
+ if (p == null) {
+ throw new NullPointerException(ParameterValidation.MAPPER_RETURNED_NULL);
+ }
+
+ p.subscribe(inner);
+ } catch (Throwable e) {
+ if (postponeFailure(e, upstream)) {
+ innerComplete(0L);
}
}
}
@Override
- public void onFailure(Throwable failure) {
- if (STATE_UPDATER.getAndSet(this, State.CANCELLED) == State.CANCELLED) {
- return;
+ public void onFailure(Throwable t) {
+ if (postponeFailure(t, inner)) {
+ onCompletion();
}
- downstream.onFailure(addFailure(failure));
}
- private Throwable addFailure(Throwable failure) {
- if (this.failure != null) {
- if (this.failure instanceof CompositeException) {
- this.failure = new CompositeException((CompositeException) this.failure, failure);
+ @Override
+ public void onCompletion() {
+ while (true) {
+ int state = this.state.get();
+ if (state == STATE_NEW || state == STATE_READY) {
+ if (this.state.compareAndSet(state, STATE_TERMINATED)) {
+ terminateDownstream();
+ return;
+ }
+ } else if (state == STATE_EMITTING) {
+ if (this.state.compareAndSet(state, STATE_OUTER_TERMINATED)) {
+ return;
+ }
} else {
- this.failure = new CompositeException(this.failure, failure);
+ return;
}
- } else {
- this.failure = failure;
}
- return this.failure;
}
- @Override
- public void onCompletion() {
- if (state == State.CANCELLED) {
- return;
+ public synchronized void tryEmit(O value) {
+ switch (state.get()) {
+ case STATE_EMITTING:
+ case STATE_OUTER_TERMINATED:
+ downstream.onItem(value);
+ break;
+ default:
+ break;
}
- upstreamHasCompleted = true;
- if (STATE_UPDATER.compareAndSet(this, State.WAITING_NEXT_PUBLISHER, State.CANCELLED)
- || STATE_UPDATER.compareAndSet(this, State.INIT, State.CANCELLED)) {
- if (failure == null) {
- downstream.onCompletion();
+ }
+
+ public void innerComplete(long emitted) {
+ if (this.state.compareAndSet(STATE_EMITTING, STATE_READY)) {
+ // Inner completed but there are outstanding requests from inner,
+ // Or the inner completed without producing any items
+ // Request new item from upstream
+ if (inner.requested() != 0L || emitted == 0) {
+ upstream.request(1);
} else {
- downstream.onFailure(failure);
+ deferredUpstreamRequest.set(true);
}
+ } else if (this.state.compareAndSet(STATE_OUTER_TERMINATED, STATE_TERMINATED)) {
+ terminateDownstream();
}
}
- @Override
- public void request(long n) {
- if (state == State.CANCELLED) {
- return;
+ public void innerFailure(Throwable e, long emitted) {
+ if (postponeFailure(e, upstream)) {
+ innerComplete(emitted);
}
- if (n <= 0) {
- cancel();
- downstream.onFailure(Subscriptions.getInvalidRequestException());
- } else {
- Subscriptions.add(DEMAND_UPDATER, this, n);
- if (STATE_UPDATER.compareAndSet(this, State.INIT, State.WAITING_NEXT_PUBLISHER)) {
- upstream.request(1L);
+ }
+
+ private boolean postponeFailure(Throwable e, Subscription subscription) {
+ if (e == null) {
+ return true;
+ }
+
+ Subscriptions.addFailure(failures, e);
+
+ if (delayError) {
+ return true;
+ }
+
+ while (true) {
+ int state = this.state.get();
+ if (state == STATE_CANCELLED || state == STATE_TERMINATED) {
+ return false;
} else {
- if (state == State.WAITING_NEXT_PUBLISHER) {
- upstream.request(1L);
- } else if (state == State.EMITTING) {
- currentUpstream.request(n);
+ if (this.state.compareAndSet(state, STATE_TERMINATED)) {
+ subscription.cancel();
+ synchronized (this) {
+ downstream.onFailure(failures.get());
+ }
+ return false;
}
}
}
}
- @Override
- public void cancel() {
- State previousState = STATE_UPDATER.getAndSet(this, State.CANCELLED);
- if (previousState == State.CANCELLED) {
+ private void terminateDownstream() {
+ Throwable ex = failures.get();
+ if (ex != null) {
+ downstream.onFailure(ex);
return;
}
- if (previousState == State.EMITTING) {
- currentUpstream.cancel();
- upstream.cancel();
- } else if (upstream != null) {
- upstream.cancel();
+ downstream.onCompletion();
+ }
+
+ @Override
+ public Context context() {
+ if (downstream instanceof ContextSupport) {
+ return ((ContextSupport) downstream).context();
+ } else {
+ return Context.empty();
}
}
- class InnerSubscriber implements MultiSubscriber, ContextSupport {
+ }
- @Override
- public void onSubscribe(Subscription subscription) {
- if (state == State.CANCELLED) {
- return;
- }
- currentUpstream = subscription;
- state = State.EMITTING;
- long pending = demand;
- if (pending > 0L) {
- currentUpstream.request(pending);
- }
- }
+ static final class ConcatMapInner extends SwitchableSubscriptionSubscriber {
+ private final ConcatMapMainSubscriber, O> parent;
- @Override
- public void onItem(O item) {
- if (state == State.CANCELLED) {
- return;
- }
- DEMAND_UPDATER.decrementAndGet(ConcatMapSubscriber.this);
- downstream.onItem(item);
- }
+ long emitted;
- @Override
- public void onFailure(Throwable failure) {
- if (state == State.CANCELLED) {
- return;
- }
- state = State.WAITING_NEXT_PUBLISHER;
- Throwable err = addFailure(failure);
- if (postponeFailurePropagation) {
- onCompletion();
- } else {
- state = State.CANCELLED;
- upstream.cancel();
- downstream.onFailure(err);
- }
- }
+ /**
+ * Downstream passed as {@code null} to {@link SwitchableSubscriptionSubscriber} as accessors are not reachable.
+ * Effective downstream is {@code parent}.
+ *
+ * @param parent parent as downstream
+ */
+ ConcatMapInner(ConcatMapMainSubscriber, O> parent) {
+ super(null);
+ this.parent = parent;
+ }
- @Override
- public void onCompletion() {
- if (state == State.CANCELLED) {
- return;
- }
- if (!upstreamHasCompleted) {
- state = State.WAITING_NEXT_PUBLISHER;
- if (demand > 0L) {
- upstream.request(1L);
- }
- } else {
- state = State.CANCELLED;
- if (failure != null) {
- downstream.onFailure(failure);
- } else {
- downstream.onComplete();
- }
- }
+ @Override
+ public void onItem(O item) {
+ emitted++;
+ parent.tryEmit(item);
+ }
+
+ @Override
+ public void onFailure(Throwable failure) {
+ long p = emitted;
+
+ if (p != 0L) {
+ emitted = 0L;
+ emitted(p);
}
- @Override
- public Context context() {
- if (downstream instanceof ContextSupport) {
- return ((ContextSupport) downstream).context();
- } else {
- return Context.empty();
- }
+ parent.innerFailure(failure, p);
+ }
+
+ @Override
+ public void onCompletion() {
+ long p = emitted;
+
+ if (p != 0L) {
+ emitted = 0L;
+ emitted(p);
}
+
+ parent.innerComplete(p);
+ }
+
+ @Override
+ public Context context() {
+ return parent.context();
}
}
}
diff --git a/implementation/src/test/java/io/smallrye/mutiny/operators/multi/MultiConcatMapNoPrefetchTest.java b/implementation/src/test/java/io/smallrye/mutiny/operators/multi/MultiConcatMapNoPrefetchTest.java
index 5785c8793..d9b12ebae 100644
--- a/implementation/src/test/java/io/smallrye/mutiny/operators/multi/MultiConcatMapNoPrefetchTest.java
+++ b/implementation/src/test/java/io/smallrye/mutiny/operators/multi/MultiConcatMapNoPrefetchTest.java
@@ -15,6 +15,7 @@
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
+import io.smallrye.mutiny.CompositeException;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.groups.MultiFlatten;
@@ -36,20 +37,6 @@ void setUp() {
});
}
- @Test
- void simpleConcatMap() {
- AssertSubscriber sub = Multi.createFrom().range(1, 3)
- .onItem().transformToMultiAndConcatenate(n -> Multi.createFrom().items(n * 10, n * 20))
- .subscribe().withSubscriber(AssertSubscriber.create());
- sub.request(1);
- sub.assertItems(10);
- sub.request(2);
- sub.assertItems(10, 20, 20);
- sub.request(Long.MAX_VALUE);
- sub.assertItems(10, 20, 20, 40);
- sub.assertCompleted();
- }
-
@ParameterizedTest
@MethodSource("argsTransformToUni")
void testTransformToUni(boolean prefetch, int[] upstreamRequests) {
@@ -197,7 +184,7 @@ void testMapperReturningNullpostponeFailure() {
.concatenate();
AssertSubscriber ts = new AssertSubscriber<>(5);
result.subscribe(ts);
- ts.assertHasNotReceivedAnyItem().assertFailedWith(NullPointerException.class);
+ ts.assertHasNotReceivedAnyItem().assertFailedWith(CompositeException.class);
}
@Test
@@ -322,4 +309,18 @@ void testCancellation() {
assertThat(sub.hasCompleted()).isFalse();
assertThat(sub.getItems()).contains(1, 2, 3);
}
+
+ @Test
+ void simpleConcatMap() {
+ AssertSubscriber sub = Multi.createFrom().range(1, 3)
+ .onItem().transformToMultiAndConcatenate(n -> Multi.createFrom().items(n * 10, n * 20))
+ .subscribe().withSubscriber(AssertSubscriber.create());
+ sub.request(1);
+ sub.assertItems(10);
+ sub.request(2);
+ sub.assertItems(10, 20, 20);
+ sub.request(Long.MAX_VALUE);
+ sub.assertItems(10, 20, 20, 40);
+ sub.assertCompleted();
+ }
}