Skip to content

Commit

Permalink
fix: timing issue in MultiOnSubscribeCall where the Multi could termi…
Browse files Browse the repository at this point in the history
…nate before the Uni

Fixes #1678
  • Loading branch information
jponge committed Oct 2, 2024
1 parent 0f930a6 commit bd4d3ad
Show file tree
Hide file tree
Showing 2 changed files with 150 additions and 8 deletions.
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package io.smallrye.mutiny.operators.multi;

import static io.smallrye.mutiny.helpers.Subscriptions.CANCELLED;
import static io.smallrye.mutiny.helpers.Subscriptions.empty;

import java.util.Objects;
import java.util.concurrent.Flow;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;

import io.smallrye.mutiny.Multi;
Expand Down Expand Up @@ -39,6 +41,11 @@ public void subscribe(MultiSubscriber<? super T> actual) {

private final class OnSubscribeSubscriber extends MultiOperatorProcessor<T, T> {

private final ReentrantLock lock = new ReentrantLock();
private Throwable failure;
private boolean terminatedEarly;
private boolean uniHasTerminated;

OnSubscribeSubscriber(MultiSubscriber<? super T> downstream) {
super(downstream);
}
Expand All @@ -48,13 +55,9 @@ public void onSubscribe(Flow.Subscription s) {
if (compareAndSetUpstreamSubscription(null, s)) {
try {
Uni<?> uni = Objects.requireNonNull(onSubscribe.apply(s), "The produced Uni must not be `null`");
uni
.subscribe().with(
ignored -> downstream.onSubscribe(this),
failure -> {
Subscriptions.fail(downstream, failure);
getAndSetUpstreamSubscription(CANCELLED).cancel();
});
uni.subscribe().with(
ignored -> uniCompleted(),
err -> uniFailed(err));
} catch (Throwable e) {
Subscriptions.fail(downstream, e);
getAndSetUpstreamSubscription(CANCELLED).cancel();
Expand All @@ -63,6 +66,61 @@ public void onSubscribe(Flow.Subscription s) {
s.cancel();
}
}

@Override
public void onFailure(Throwable throwable) {
lock.lock();
if (!uniHasTerminated) {
terminatedEarly = true;
this.failure = throwable;
lock.unlock();
} else {
lock.unlock();
super.onFailure(throwable);

Check warning on line 79 in implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiOnSubscribeCall.java

View check run for this annotation

Codecov / codecov/patch

implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiOnSubscribeCall.java#L78-L79

Added lines #L78 - L79 were not covered by tests
}
}

@Override
public void onCompletion() {
lock.lock();
if (!uniHasTerminated) {
terminatedEarly = true;
lock.unlock();
} else {
lock.unlock();
super.onCompletion();
}
}

private void uniFailed(Throwable failure) {
getAndSetUpstreamSubscription(CANCELLED).cancel();
lock.lock();
uniHasTerminated = true;
if (this.failure == null) {
this.failure = failure;
} else {
this.failure.addSuppressed(failure);
}
lock.unlock();
Subscriptions.fail(downstream, this.failure);
}

private void uniCompleted() {
lock.lock();
uniHasTerminated = true;
lock.unlock();
if (terminatedEarly) {
getAndSetUpstreamSubscription(CANCELLED).cancel();
downstream.onSubscribe(empty());
if (this.failure != null) {
downstream.onFailure(failure);
} else {
downstream.onComplete();
}
} else {
downstream.onSubscribe(this);
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,18 @@
import static org.junit.jupiter.api.Assertions.assertThrows;

import java.io.IOException;
import java.util.concurrent.*;
import java.time.Duration;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Stream;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.parallel.ResourceAccessMode;
Expand Down Expand Up @@ -317,4 +323,82 @@ public void testRunSubscriptionOnShutdownExecutorRequests() {
subscriber.assertFailedWith(RejectedExecutionException.class, "");
}

@Test
public void testNoEarlyCompletionBeforeCallCompletes() {
AtomicReference<Subscription> callSubscriptionRef = new AtomicReference<>();

AssertSubscriber<Object> testSubscriber = Multi.createFrom().items(Stream::empty)
.onSubscription().call(sub -> Uni.createFrom().item(sub)
.onItem().delayIt().by(Duration.ofSeconds(1L))
.onItem().invoke(callSubscriptionRef::set))
.subscribe().withSubscriber(AssertSubscriber.create(Long.MAX_VALUE));

testSubscriber.awaitCompletion().assertSubscribed().assertHasNotReceivedAnyItem();
assertThat(callSubscriptionRef.get()).isNotNull();
}

@Test
public void testNoEarlyFailureBeforeCallCompletes() {
AtomicReference<Subscription> callSubscriptionRef = new AtomicReference<>();

AssertSubscriber<Object> testSubscriber = Multi.createFrom().emitter(emitter -> {
emitter.fail(new IOException("boom"));
}).onSubscription().call(sub -> Uni.createFrom().item(sub)
.onItem().delayIt().by(Duration.ofSeconds(1L))
.onItem().invoke(callSubscriptionRef::set))
.subscribe().withSubscriber(AssertSubscriber.create(Long.MAX_VALUE));

testSubscriber
.awaitFailure()
.assertSubscribed()
.assertHasNotReceivedAnyItem()
.assertFailedWith(IOException.class, "boom");
assertThat(callSubscriptionRef.get()).isNotNull();
}

@Test
public void testNoEarlyFailureBeforeCallFails() {
AtomicBoolean sync = new AtomicBoolean();
AssertSubscriber<Object> testSubscriber = Multi.createFrom().emitter(emitter -> {
emitter.fail(new IOException("boom"));
sync.set(true);
})
.onSubscription().call(sub -> Uni.createFrom()
.emitter(uniEmitter -> {
await().untilTrue(sync);
uniEmitter.fail(new RuntimeException("woops"));
})
.runSubscriptionOn(Infrastructure.getDefaultExecutor()))
.subscribe().withSubscriber(AssertSubscriber.create(Long.MAX_VALUE));

testSubscriber
.awaitFailure()
.assertSubscribed()
.assertHasNotReceivedAnyItem()
.assertFailedWith(IOException.class, "boom");
Throwable failure = testSubscriber.getFailure();
assertThat(failure).hasSuppressedException(new RuntimeException("woops"));
}

@Test
public void testNoEarlyCompletionAfterCallFails() {
AtomicBoolean sync = new AtomicBoolean();
AssertSubscriber<Object> testSubscriber = Multi.createFrom().emitter(emitter -> {
await().untilTrue(sync);
emitter.complete();
})
.onSubscription().call(sub -> Uni.createFrom()
.emitter(uniEmitter -> {
uniEmitter.fail(new RuntimeException("woops"));
sync.set(true);
})
.runSubscriptionOn(Infrastructure.getDefaultExecutor()))
.subscribe().withSubscriber(AssertSubscriber.create(Long.MAX_VALUE));

testSubscriber
.awaitFailure()
.assertSubscribed()
.assertHasNotReceivedAnyItem()
.assertFailedWith(RuntimeException.class, "woops");
}
}

0 comments on commit bd4d3ad

Please sign in to comment.