Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update to Mutiny 2.6.0 and Reactive Messaging 4.20.0 #39768

Merged
merged 4 commits into from
Mar 29, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Messaging extensions default execution mode to worker thread for the …
…signatures considered blocking
ozangunalp committed Mar 28, 2024

Verified

This commit was signed with the committer’s verified signature.
ozangunalp Ozan Gunalp
commit 01b2ff48ea44e5149628267190c9062fe40edb3b
Original file line number Diff line number Diff line change
@@ -8,6 +8,7 @@
import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.INCOMINGS;
import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.KOTLIN_UNIT;
import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.MERGE;
import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.NON_BLOCKING;
import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.OUTGOING;
import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.OUTGOINGS;
import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.RUN_ON_VIRTUAL_THREAD;
@@ -36,6 +37,7 @@
import io.quarkus.smallrye.reactivemessaging.runtime.QuarkusMediatorConfiguration;
import io.quarkus.smallrye.reactivemessaging.runtime.QuarkusParameterDescriptor;
import io.quarkus.smallrye.reactivemessaging.runtime.QuarkusWorkerPoolRegistry;
import io.quarkus.smallrye.reactivemessaging.runtime.ReactiveMessagingConfiguration;
import io.quarkus.smallrye.reactivemessaging.runtime.TypeInfo;
import io.smallrye.reactive.messaging.Shape;
import io.smallrye.reactive.messaging.annotations.Blocking;
@@ -50,7 +52,7 @@ private QuarkusMediatorConfigurationUtil() {

public static QuarkusMediatorConfiguration create(MethodInfo methodInfo, boolean isSuspendMethod, BeanInfo bean,
RecorderContext recorderContext,
ClassLoader cl, boolean strict) {
ClassLoader cl, boolean strict, ReactiveMessagingConfiguration.ExecutionMode executionMode) {

Class[] parameterTypeClasses;
Class<?> returnTypeClass;
@@ -180,6 +182,22 @@ public Integer get() {
}
}));
configuration.setHasTargetedOutput(mediatorConfigurationSupport.processTargetedOutput());
if (!hasBlockingAnnotation(methodInfo)
&& !hasNonBlockingAnnotation(methodInfo)
&& hasBlockingPayloadSignature(methodInfo)) {
switch (executionMode) {
case WORKER:
configuration.setBlocking(true);
configuration.setBlockingExecutionOrdered(true);
break;
case VIRTUAL_THREAD:
configuration.setBlocking(true);
configuration.setWorkerPoolName(QuarkusWorkerPoolRegistry.DEFAULT_VIRTUAL_THREAD_WORKER);
break;
default:
break;
}
}

AnnotationInstance blockingAnnotation = methodInfo.annotation(BLOCKING);
AnnotationInstance smallryeBlockingAnnotation = methodInfo.annotation(SMALLRYE_BLOCKING);
@@ -452,4 +470,21 @@ private static Type getGenericParameterType(MethodInfo method, int paramIndex) {
return parameters.get(paramIndex);
}
}

private static boolean hasNonBlockingAnnotation(MethodInfo method) {
return method.hasAnnotation(NON_BLOCKING);
}

public static boolean hasBlockingAnnotation(MethodInfo method) {
return method.hasAnnotation(BLOCKING)
|| method.hasAnnotation(SMALLRYE_BLOCKING)
|| method.hasAnnotation(RUN_ON_VIRTUAL_THREAD)
|| method.hasAnnotation(TRANSACTIONAL);
}

private static boolean hasBlockingPayloadSignature(MethodInfo methodInfo) {
return !ReactiveMessagingDotNames.UNI.equals(methodInfo.returnType().name())
&& !ReactiveMessagingDotNames.MULTI.equals(methodInfo.returnType().name())
&& !ReactiveMessagingDotNames.COMPLETION_STAGE.equals(methodInfo.returnType().name());
}
}
Original file line number Diff line number Diff line change
@@ -10,7 +10,10 @@
import org.eclipse.microprofile.reactive.messaging.spi.OutgoingConnectorFactory;
import org.jboss.jandex.DotName;

import io.smallrye.common.annotation.NonBlocking;
import io.smallrye.common.annotation.RunOnVirtualThread;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.MessageConverter;
import io.smallrye.reactive.messaging.MutinyEmitter;
import io.smallrye.reactive.messaging.annotations.Blocking;
@@ -46,6 +49,7 @@ public final class ReactiveMessagingDotNames {
static final DotName CONNECTOR_ATTRIBUTE = DotName.createSimple(ConnectorAttribute.class.getName());

static final DotName BLOCKING = DotName.createSimple(Blocking.class.getName());
static final DotName NON_BLOCKING = DotName.createSimple(NonBlocking.class.getName());
public static final DotName CHANNEL = DotName
.createSimple(org.eclipse.microprofile.reactive.messaging.Channel.class.getName());
public static final DotName LEGACY_CHANNEL = DotName.createSimple(Channel.class.getName());
@@ -94,6 +98,9 @@ public final class ReactiveMessagingDotNames {
static final DotName TRANSACTIONAL = DotName.createSimple("jakarta.transaction.Transactional");
static final DotName RUN_ON_VIRTUAL_THREAD = DotName.createSimple(RunOnVirtualThread.class.getName());

static final DotName UNI = DotName.createSimple(Uni.class.getName());
static final DotName MULTI = DotName.createSimple(Multi.class.getName());

private ReactiveMessagingDotNames() {
}

Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
package io.quarkus.smallrye.reactivemessaging.deployment;

import static io.quarkus.deployment.annotations.ExecutionTime.STATIC_INIT;
import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.BLOCKING;
import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.INCOMING;
import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.INCOMINGS;
import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.RUN_ON_VIRTUAL_THREAD;
import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.SMALLRYE_BLOCKING;
import static io.quarkus.smallrye.reactivemessaging.deployment.ReactiveMessagingDotNames.TRANSACTIONAL;

import java.lang.reflect.Modifier;
import java.util.ArrayList;
@@ -13,6 +12,8 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

import jakarta.enterprise.context.Dependent;
import jakarta.enterprise.inject.Vetoed;
@@ -64,6 +65,8 @@
import io.quarkus.runtime.metrics.MetricsFactory;
import io.quarkus.runtime.util.HashUtil;
import io.quarkus.smallrye.health.deployment.spi.HealthBuildItem;
import io.quarkus.smallrye.reactivemessaging.deployment.items.ChannelBuildItem;
import io.quarkus.smallrye.reactivemessaging.deployment.items.ChannelDirection;
import io.quarkus.smallrye.reactivemessaging.deployment.items.InjectedChannelBuildItem;
import io.quarkus.smallrye.reactivemessaging.deployment.items.InjectedEmitterBuildItem;
import io.quarkus.smallrye.reactivemessaging.deployment.items.MediatorBuildItem;
@@ -230,6 +233,7 @@ public void enableHealth(ReactiveMessagingBuildTimeConfig buildTimeConfig,
public void build(SmallRyeReactiveMessagingRecorder recorder, RecorderContext recorderContext,
BuildProducer<SyntheticBeanBuildItem> syntheticBeans,
List<MediatorBuildItem> mediatorMethods,
List<ChannelBuildItem> channelBuildItems,
List<InjectedEmitterBuildItem> emitterFields,
List<InjectedChannelBuildItem> channelFields,
BuildProducer<GeneratedClassBuildItem> generatedClass,
@@ -239,6 +243,11 @@ public void build(SmallRyeReactiveMessagingRecorder recorder, RecorderContext re

ClassOutput classOutput = new GeneratedClassGizmoAdaptor(generatedClass, true);

Set<String> innerIncomingChannelNames = channelBuildItems.stream()
.filter(c -> !c.isManagedByAConnector() && c.getDirection() == ChannelDirection.INCOMING)
.map(ChannelBuildItem::getName)
.collect(Collectors.toSet());

List<QuarkusMediatorConfiguration> mediatorConfigurations = new ArrayList<>(mediatorMethods.size());
List<WorkerConfiguration> workerConfigurations = new ArrayList<>();
Map<String, EmitterConfiguration> emittersConfigurations = new HashMap<>();
@@ -253,9 +262,7 @@ public void build(SmallRyeReactiveMessagingRecorder recorder, RecorderContext re
MethodInfo methodInfo = mediatorMethod.getMethod();
BeanInfo bean = mediatorMethod.getBean();

if (methodInfo.hasAnnotation(BLOCKING) || methodInfo.hasAnnotation(SMALLRYE_BLOCKING)
|| methodInfo.hasAnnotation(RUN_ON_VIRTUAL_THREAD)
|| methodInfo.hasAnnotation(TRANSACTIONAL)) {
if (QuarkusMediatorConfigurationUtil.hasBlockingAnnotation(methodInfo)) {
// Just in case both annotation are used, use @Blocking value.
String poolName = methodInfo.hasAnnotation(RUN_ON_VIRTUAL_THREAD)
? QuarkusWorkerPoolRegistry.DEFAULT_VIRTUAL_THREAD_WORKER
@@ -280,7 +287,10 @@ public void build(SmallRyeReactiveMessagingRecorder recorder, RecorderContext re

QuarkusMediatorConfiguration mediatorConfiguration = QuarkusMediatorConfigurationUtil
.create(methodInfo, isSuspendMethod, bean, recorderContext,
Thread.currentThread().getContextClassLoader(), conf.strict);
Thread.currentThread().getContextClassLoader(), conf.strict,
consumesFromInnerChannel(methodInfo, innerIncomingChannelNames)
? ReactiveMessagingConfiguration.ExecutionMode.EVENT_LOOP // disable execution mode setting for inner channels
: conf.blockingSignaturesExecutionMode);
mediatorConfigurations.add(mediatorConfiguration);

String generatedInvokerName = generateInvoker(bean, methodInfo, isSuspendMethod, mediatorConfiguration,
@@ -567,4 +577,18 @@ private boolean doesImplement(ClassInfo clazz, DotName iface, IndexView index) {
}));
}

boolean consumesFromInnerChannel(MethodInfo methodInfo, Set<String> innerChannelNames) {
AnnotationInstance incoming = methodInfo.annotation(INCOMING);
if (incoming != null) {
return innerChannelNames.contains(incoming.value().asString());
}
AnnotationInstance incomings = methodInfo.annotation(INCOMINGS);
if (incomings != null) {
return innerChannelNames.containsAll(
Arrays.stream(incomings.value().asNestedArray())
.map(i -> i.value().asString()).collect(Collectors.toSet()));
}
return false;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
package io.quarkus.smallrye.reactivemessaging.signatures;

import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Flow;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.quarkus.smallrye.reactivemessaging.config.DumbConnector;
import io.quarkus.test.QuarkusUnitTest;
import io.smallrye.common.annotation.NonBlocking;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;

public class BlockingSignatureExecutionModeTest {

@RegisterExtension
static final QuarkusUnitTest config = new QuarkusUnitTest()
.withApplicationRoot((jar) -> jar
.addClasses(DumbConnector.class,
ProducerOnC.class,
BlockingConsumerFromConnector.class,
ConsumerFromConnector.class,
ConsumerFromInnerChannel.class))
.overrideConfigKey("mp.messaging.incoming.a.connector", "dummy")
.overrideConfigKey("mp.messaging.incoming.a.values", "bonjour")
.overrideConfigKey("mp.messaging.incoming.b.connector", "dummy")
.overrideConfigKey("mp.messaging.incoming.b.values", "bonjour")
.overrideConfigKey("mp.messaging.incoming.c.connector", "dummy")
.overrideConfigKey("mp.messaging.incoming.c.values", "bonjour");

@Inject
BlockingConsumerFromConnector blockingConsumerFromConnector;

@Test
public void testBlockingSignatureFromConnector() {
await().until(() -> blockingConsumerFromConnector.list().size() == 2);
List<String> threadNames = blockingConsumerFromConnector.threads().stream().distinct().toList();
assertThat(threadNames.contains(Thread.currentThread().getName())).isFalse();
for (String name : threadNames) {
assertThat(name.startsWith("executor-thread-")).isTrue();
}
}

@ApplicationScoped
public static class BlockingConsumerFromConnector {
private final List<String> list = new CopyOnWriteArrayList<>();
private final List<String> threads = new CopyOnWriteArrayList<>();

@Incoming("a")
public void produce(String s) {
threads.add(Thread.currentThread().getName());
list.add(s);
}

public List<String> threads() {
return threads;
}

public List<String> list() {
return list;
}
}

@Inject
ConsumerFromConnector consumerFromConnector;

@Test
public void testNonBlockingSignatureFromConnector() {
await().until(() -> consumerFromConnector.list().size() == 2);
List<String> threadNames = consumerFromConnector.threads().stream().distinct().toList();
assertThat(threadNames).containsOnly(Thread.currentThread().getName());
}

@ApplicationScoped
public static class ConsumerFromConnector {
private final List<String> list = new CopyOnWriteArrayList<>();
private final List<String> threads = new CopyOnWriteArrayList<>();

@Incoming("b")
public Uni<Void> produce(String s) {
threads.add(Thread.currentThread().getName());
list.add(s);
return Uni.createFrom().voidItem();
}

public List<String> threads() {
return threads;
}

public List<String> list() {
return list;
}
}

@Inject
NonBlockingConsumerFromConnector nonBlockingConsumerFromConnector;

@Test
public void testNonBlockingAnnotationFromConnector() {
await().until(() -> nonBlockingConsumerFromConnector.list().size() == 2);
List<String> threadNames = nonBlockingConsumerFromConnector.threads().stream().distinct().toList();
assertThat(threadNames).containsOnly(Thread.currentThread().getName());
}

@ApplicationScoped
public static class NonBlockingConsumerFromConnector {
private final List<String> list = new CopyOnWriteArrayList<>();
private final List<String> threads = new CopyOnWriteArrayList<>();

@Incoming("c")
@NonBlocking
public void produce(String s) {
threads.add(Thread.currentThread().getName());
list.add(s);
}

public List<String> threads() {
return threads;
}

public List<String> list() {
return list;
}
}

@Inject
ConsumerFromInnerChannel consumerFromInnerChannel;

@Test
public void testBlockingSignatureFromInnerChannel() {
await().until(() -> consumerFromInnerChannel.list().size() == 3);
assertThat(consumerFromInnerChannel.list()).containsExactly("d", "e", "f");
List<String> threadNames = consumerFromInnerChannel.threads().stream().distinct().toList();
assertThat(threadNames).containsOnly(Thread.currentThread().getName());
}

@ApplicationScoped
public static class ConsumerFromInnerChannel {

private final List<String> list = new CopyOnWriteArrayList<>();
private final List<String> threads = new CopyOnWriteArrayList<>();

@Incoming("d")
public void produce(String s) {
threads.add(Thread.currentThread().getName());
list.add(s);
}

public List<String> threads() {
return threads;
}

public List<String> list() {
return list;
}
}

@ApplicationScoped
private static class ProducerOnC {

@Outgoing("d")
public Flow.Publisher<String> produce() {
return Multi.createFrom().items("d", "e", "f");
}

}
}
Original file line number Diff line number Diff line change
@@ -19,4 +19,17 @@ public class ReactiveMessagingConfiguration {
*/
@ConfigItem(name = "strict", defaultValue = "false")
public boolean strict;

/**
* Execution mode for the Messaging signatures considered "blocking", defaults to "worker".
* For the previous behaviour set to "event-loop".
*/
@ConfigItem(name = "blocking.signatures.execution.mode", defaultValue = "worker")
public ExecutionMode blockingSignaturesExecutionMode;

public enum ExecutionMode {
EVENT_LOOP,
WORKER,
VIRTUAL_THREAD
}
}

Unchanged files with check annotations Beta

== Sending messages to Pulsar
The Pulsar Connector can write Reactive Messaging `Message`s as Pulsar Message.

Check warning on line 458 in docs/src/main/asciidoc/pulsar.adoc

GitHub Actions / Linting with Vale

[vale] reported by reviewdog 🐶 [Quarkus.TermsSuggestions] Depending on the context, consider using 'because' or 'while' rather than 'as'. Raw Output: {"message": "[Quarkus.TermsSuggestions] Depending on the context, consider using 'because' or 'while' rather than 'as'.", "location": {"path": "docs/src/main/asciidoc/pulsar.adoc", "range": {"start": {"line": 458, "column": 62}}}, "severity": "INFO"}
=== Example
----
The incoming method can also receive `Message<List<Payload>>`, `Message<ConsumerRecords<Key, Payload>>`, and `ConsumerRecords<Key, Payload>` types.
They give access to record details such as offset or timestamp:

Check warning on line 720 in docs/src/main/asciidoc/kafka.adoc

GitHub Actions / Linting with Vale

[vale] reported by reviewdog 🐶 [Quarkus.TermsSuggestions] Depending on the context, consider using 'because' or 'while' rather than 'as'. Raw Output: {"message": "[Quarkus.TermsSuggestions] Depending on the context, consider using 'because' or 'while' rather than 'as'.", "location": {"path": "docs/src/main/asciidoc/kafka.adoc", "range": {"start": {"line": 720, "column": 30}}}, "severity": "INFO"}

Check warning on line 720 in docs/src/main/asciidoc/kafka.adoc

GitHub Actions / Linting with Vale

[vale] reported by reviewdog 🐶 [Quarkus.TermsSuggestions] Depending on the context, consider using 'because' or 'while' rather than 'as'. Raw Output: {"message": "[Quarkus.TermsSuggestions] Depending on the context, consider using 'because' or 'while' rather than 'as'.", "location": {"path": "docs/src/main/asciidoc/kafka.adoc", "range": {"start": {"line": 720, "column": 41}}}, "severity": "INFO"}
[source, java]
----