Skip to content

Commit

Permalink
Messaging extensions default execution mode to worker thread for the …
Browse files Browse the repository at this point in the history
…signatures considered blocking
  • Loading branch information
ozangunalp committed Mar 28, 2024
1 parent a634bc4 commit 01b2ff4
Show file tree
Hide file tree
Showing 5 changed files with 264 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.INCOMINGS;
import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.KOTLIN_UNIT;
import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.MERGE;
import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.NON_BLOCKING;
import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.OUTGOING;
import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.OUTGOINGS;
import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.RUN_ON_VIRTUAL_THREAD;
Expand Down Expand Up @@ -36,6 +37,7 @@
import io.quarkus.smallrye.reactivemessaging.runtime.QuarkusMediatorConfiguration;
import io.quarkus.smallrye.reactivemessaging.runtime.QuarkusParameterDescriptor;
import io.quarkus.smallrye.reactivemessaging.runtime.QuarkusWorkerPoolRegistry;
import io.quarkus.smallrye.reactivemessaging.runtime.ReactiveMessagingConfiguration;
import io.quarkus.smallrye.reactivemessaging.runtime.TypeInfo;
import io.smallrye.reactive.messaging.Shape;
import io.smallrye.reactive.messaging.annotations.Blocking;
Expand All @@ -50,7 +52,7 @@ private QuarkusMediatorConfigurationUtil() {

public static QuarkusMediatorConfiguration create(MethodInfo methodInfo, boolean isSuspendMethod, BeanInfo bean,
RecorderContext recorderContext,
ClassLoader cl, boolean strict) {
ClassLoader cl, boolean strict, ReactiveMessagingConfiguration.ExecutionMode executionMode) {

Class[] parameterTypeClasses;
Class<?> returnTypeClass;
Expand Down Expand Up @@ -180,6 +182,22 @@ public Integer get() {
}
}));
configuration.setHasTargetedOutput(mediatorConfigurationSupport.processTargetedOutput());
if (!hasBlockingAnnotation(methodInfo)
&& !hasNonBlockingAnnotation(methodInfo)
&& hasBlockingPayloadSignature(methodInfo)) {
switch (executionMode) {
case WORKER:
configuration.setBlocking(true);
configuration.setBlockingExecutionOrdered(true);
break;
case VIRTUAL_THREAD:
configuration.setBlocking(true);
configuration.setWorkerPoolName(QuarkusWorkerPoolRegistry.DEFAULT_VIRTUAL_THREAD_WORKER);
break;
default:
break;
}
}

AnnotationInstance blockingAnnotation = methodInfo.annotation(BLOCKING);
AnnotationInstance smallryeBlockingAnnotation = methodInfo.annotation(SMALLRYE_BLOCKING);
Expand Down Expand Up @@ -452,4 +470,21 @@ private static Type getGenericParameterType(MethodInfo method, int paramIndex) {
return parameters.get(paramIndex);
}
}

private static boolean hasNonBlockingAnnotation(MethodInfo method) {
return method.hasAnnotation(NON_BLOCKING);
}

public static boolean hasBlockingAnnotation(MethodInfo method) {
return method.hasAnnotation(BLOCKING)
|| method.hasAnnotation(SMALLRYE_BLOCKING)
|| method.hasAnnotation(RUN_ON_VIRTUAL_THREAD)
|| method.hasAnnotation(TRANSACTIONAL);
}

private static boolean hasBlockingPayloadSignature(MethodInfo methodInfo) {
return !ReactiveMessagingDotNames.UNI.equals(methodInfo.returnType().name())
&& !ReactiveMessagingDotNames.MULTI.equals(methodInfo.returnType().name())
&& !ReactiveMessagingDotNames.COMPLETION_STAGE.equals(methodInfo.returnType().name());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@
import org.eclipse.microprofile.reactive.messaging.spi.OutgoingConnectorFactory;
import org.jboss.jandex.DotName;

import io.smallrye.common.annotation.NonBlocking;
import io.smallrye.common.annotation.RunOnVirtualThread;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.MessageConverter;
import io.smallrye.reactive.messaging.MutinyEmitter;
import io.smallrye.reactive.messaging.annotations.Blocking;
Expand Down Expand Up @@ -46,6 +49,7 @@ public final class ReactiveMessagingDotNames {
static final DotName CONNECTOR_ATTRIBUTE = DotName.createSimple(ConnectorAttribute.class.getName());

static final DotName BLOCKING = DotName.createSimple(Blocking.class.getName());
static final DotName NON_BLOCKING = DotName.createSimple(NonBlocking.class.getName());
public static final DotName CHANNEL = DotName
.createSimple(org.eclipse.microprofile.reactive.messaging.Channel.class.getName());
public static final DotName LEGACY_CHANNEL = DotName.createSimple(Channel.class.getName());
Expand Down Expand Up @@ -94,6 +98,9 @@ public final class ReactiveMessagingDotNames {
static final DotName TRANSACTIONAL = DotName.createSimple("jakarta.transaction.Transactional");
static final DotName RUN_ON_VIRTUAL_THREAD = DotName.createSimple(RunOnVirtualThread.class.getName());

static final DotName UNI = DotName.createSimple(Uni.class.getName());
static final DotName MULTI = DotName.createSimple(Multi.class.getName());

private ReactiveMessagingDotNames() {
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
package io.quarkus.smallrye.reactivemessaging.deployment;

import static io.quarkus.deployment.annotations.ExecutionTime.STATIC_INIT;
import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.BLOCKING;
import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.INCOMING;
import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.INCOMINGS;
import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.RUN_ON_VIRTUAL_THREAD;
import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.SMALLRYE_BLOCKING;
import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.TRANSACTIONAL;

import java.lang.reflect.Modifier;
import java.util.ArrayList;
Expand All @@ -13,6 +12,8 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

import jakarta.enterprise.context.Dependent;
import jakarta.enterprise.inject.Vetoed;
Expand Down Expand Up @@ -64,6 +65,8 @@
import io.quarkus.runtime.metrics.MetricsFactory;
import io.quarkus.runtime.util.HashUtil;
import io.quarkus.smallrye.health.deployment.spi.HealthBuildItem;
import io.quarkus.smallrye.reactivemessaging.deployment.items.ChannelBuildItem;
import io.quarkus.smallrye.reactivemessaging.deployment.items.ChannelDirection;
import io.quarkus.smallrye.reactivemessaging.deployment.items.InjectedChannelBuildItem;
import io.quarkus.smallrye.reactivemessaging.deployment.items.InjectedEmitterBuildItem;
import io.quarkus.smallrye.reactivemessaging.deployment.items.MediatorBuildItem;
Expand Down Expand Up @@ -230,6 +233,7 @@ public void enableHealth(ReactiveMessagingBuildTimeConfig buildTimeConfig,
public void build(SmallRyeReactiveMessagingRecorder recorder, RecorderContext recorderContext,
BuildProducer<SyntheticBeanBuildItem> syntheticBeans,
List<MediatorBuildItem> mediatorMethods,
List<ChannelBuildItem> channelBuildItems,
List<InjectedEmitterBuildItem> emitterFields,
List<InjectedChannelBuildItem> channelFields,
BuildProducer<GeneratedClassBuildItem> generatedClass,
Expand All @@ -239,6 +243,11 @@ public void build(SmallRyeReactiveMessagingRecorder recorder, RecorderContext re

ClassOutput classOutput = new GeneratedClassGizmoAdaptor(generatedClass, true);

Set<String> innerIncomingChannelNames = channelBuildItems.stream()
.filter(c -> !c.isManagedByAConnector() && c.getDirection() == ChannelDirection.INCOMING)
.map(ChannelBuildItem::getName)
.collect(Collectors.toSet());

List<QuarkusMediatorConfiguration> mediatorConfigurations = new ArrayList<>(mediatorMethods.size());
List<WorkerConfiguration> workerConfigurations = new ArrayList<>();
Map<String, EmitterConfiguration> emittersConfigurations = new HashMap<>();
Expand All @@ -253,9 +262,7 @@ public void build(SmallRyeReactiveMessagingRecorder recorder, RecorderContext re
MethodInfo methodInfo = mediatorMethod.getMethod();
BeanInfo bean = mediatorMethod.getBean();

if (methodInfo.hasAnnotation(BLOCKING) || methodInfo.hasAnnotation(SMALLRYE_BLOCKING)
|| methodInfo.hasAnnotation(RUN_ON_VIRTUAL_THREAD)
|| methodInfo.hasAnnotation(TRANSACTIONAL)) {
if (QuarkusMediatorConfigurationUtil.hasBlockingAnnotation(methodInfo)) {
// Just in case both annotation are used, use @Blocking value.
String poolName = methodInfo.hasAnnotation(RUN_ON_VIRTUAL_THREAD)
? QuarkusWorkerPoolRegistry.DEFAULT_VIRTUAL_THREAD_WORKER
Expand All @@ -280,7 +287,10 @@ public void build(SmallRyeReactiveMessagingRecorder recorder, RecorderContext re

QuarkusMediatorConfiguration mediatorConfiguration = QuarkusMediatorConfigurationUtil
.create(methodInfo, isSuspendMethod, bean, recorderContext,
Thread.currentThread().getContextClassLoader(), conf.strict);
Thread.currentThread().getContextClassLoader(), conf.strict,
consumesFromInnerChannel(methodInfo, innerIncomingChannelNames)
? ReactiveMessagingConfiguration.ExecutionMode.EVENT_LOOP // disable execution mode setting for inner channels
: conf.blockingSignaturesExecutionMode);
mediatorConfigurations.add(mediatorConfiguration);

String generatedInvokerName = generateInvoker(bean, methodInfo, isSuspendMethod, mediatorConfiguration,
Expand Down Expand Up @@ -567,4 +577,18 @@ private boolean doesImplement(ClassInfo clazz, DotName iface, IndexView index) {
}));
}

boolean consumesFromInnerChannel(MethodInfo methodInfo, Set<String> innerChannelNames) {
AnnotationInstance incoming = methodInfo.annotation(INCOMING);
if (incoming != null) {
return innerChannelNames.contains(incoming.value().asString());
}
AnnotationInstance incomings = methodInfo.annotation(INCOMINGS);
if (incomings != null) {
return innerChannelNames.containsAll(
Arrays.stream(incomings.value().asNestedArray())
.map(i -> i.value().asString()).collect(Collectors.toSet()));
}
return false;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
package io.quarkus.smallrye.reactivemessaging.signatures;

import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Flow;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.smallrye.reactivemessaging.config.DumbConnector;
import io.quarkus.test.QuarkusUnitTest;
import io.smallrye.common.annotation.NonBlocking;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;

public class BlockingSignatureExecutionModeTest {

@RegisterExtension
static final QuarkusUnitTest config = new QuarkusUnitTest()
.withApplicationRoot((jar) -> jar
.addClasses(DumbConnector.class,
ProducerOnC.class,
BlockingConsumerFromConnector.class,
ConsumerFromConnector.class,
ConsumerFromInnerChannel.class))
.overrideConfigKey("mp.messaging.incoming.a.connector", "dummy")
.overrideConfigKey("mp.messaging.incoming.a.values", "bonjour")
.overrideConfigKey("mp.messaging.incoming.b.connector", "dummy")
.overrideConfigKey("mp.messaging.incoming.b.values", "bonjour")
.overrideConfigKey("mp.messaging.incoming.c.connector", "dummy")
.overrideConfigKey("mp.messaging.incoming.c.values", "bonjour");

@Inject
BlockingConsumerFromConnector blockingConsumerFromConnector;

@Test
public void testBlockingSignatureFromConnector() {
await().until(() -> blockingConsumerFromConnector.list().size() == 2);
List<String> threadNames = blockingConsumerFromConnector.threads().stream().distinct().toList();
assertThat(threadNames.contains(Thread.currentThread().getName())).isFalse();
for (String name : threadNames) {
assertThat(name.startsWith("executor-thread-")).isTrue();
}
}

@ApplicationScoped
public static class BlockingConsumerFromConnector {
private final List<String> list = new CopyOnWriteArrayList<>();
private final List<String> threads = new CopyOnWriteArrayList<>();

@Incoming("a")
public void produce(String s) {
threads.add(Thread.currentThread().getName());
list.add(s);
}

public List<String> threads() {
return threads;
}

public List<String> list() {
return list;
}
}

@Inject
ConsumerFromConnector consumerFromConnector;

@Test
public void testNonBlockingSignatureFromConnector() {
await().until(() -> consumerFromConnector.list().size() == 2);
List<String> threadNames = consumerFromConnector.threads().stream().distinct().toList();
assertThat(threadNames).containsOnly(Thread.currentThread().getName());
}

@ApplicationScoped
public static class ConsumerFromConnector {
private final List<String> list = new CopyOnWriteArrayList<>();
private final List<String> threads = new CopyOnWriteArrayList<>();

@Incoming("b")
public Uni<Void> produce(String s) {
threads.add(Thread.currentThread().getName());
list.add(s);
return Uni.createFrom().voidItem();
}

public List<String> threads() {
return threads;
}

public List<String> list() {
return list;
}
}

@Inject
NonBlockingConsumerFromConnector nonBlockingConsumerFromConnector;

@Test
public void testNonBlockingAnnotationFromConnector() {
await().until(() -> nonBlockingConsumerFromConnector.list().size() == 2);
List<String> threadNames = nonBlockingConsumerFromConnector.threads().stream().distinct().toList();
assertThat(threadNames).containsOnly(Thread.currentThread().getName());
}

@ApplicationScoped
public static class NonBlockingConsumerFromConnector {
private final List<String> list = new CopyOnWriteArrayList<>();
private final List<String> threads = new CopyOnWriteArrayList<>();

@Incoming("c")
@NonBlocking
public void produce(String s) {
threads.add(Thread.currentThread().getName());
list.add(s);
}

public List<String> threads() {
return threads;
}

public List<String> list() {
return list;
}
}

@Inject
ConsumerFromInnerChannel consumerFromInnerChannel;

@Test
public void testBlockingSignatureFromInnerChannel() {
await().until(() -> consumerFromInnerChannel.list().size() == 3);
assertThat(consumerFromInnerChannel.list()).containsExactly("d", "e", "f");
List<String> threadNames = consumerFromInnerChannel.threads().stream().distinct().toList();
assertThat(threadNames).containsOnly(Thread.currentThread().getName());
}

@ApplicationScoped
public static class ConsumerFromInnerChannel {

private final List<String> list = new CopyOnWriteArrayList<>();
private final List<String> threads = new CopyOnWriteArrayList<>();

@Incoming("d")
public void produce(String s) {
threads.add(Thread.currentThread().getName());
list.add(s);
}

public List<String> threads() {
return threads;
}

public List<String> list() {
return list;
}
}

@ApplicationScoped
private static class ProducerOnC {

@Outgoing("d")
public Flow.Publisher<String> produce() {
return Multi.createFrom().items("d", "e", "f");
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,17 @@ public class ReactiveMessagingConfiguration {
*/
@ConfigItem(name = "strict", defaultValue = "false")
public boolean strict;

/**
* Execution mode for the Messaging signatures considered "blocking", defaults to "worker".
* For the previous behaviour set to "event-loop".
*/
@ConfigItem(name = "blocking.signatures.execution.mode", defaultValue = "worker")
public ExecutionMode blockingSignaturesExecutionMode;

public enum ExecutionMode {
EVENT_LOOP,
WORKER,
VIRTUAL_THREAD
}
}

0 comments on commit 01b2ff4

Please sign in to comment.