From c40eb94bb8be932b53c9dd29b8262ddf19fde1f0 Mon Sep 17 00:00:00 2001 From: Ozan Gunalp Date: Thu, 4 Apr 2024 15:07:59 +0200 Subject: [PATCH] Fix for messaging blocking signatures execution mode --- .../SmallRyeReactiveMessagingProcessor.java | 22 +++++++++---------- .../BlockingSignatureExecutionModeTest.java | 2 +- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/SmallRyeReactiveMessagingProcessor.java b/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/SmallRyeReactiveMessagingProcessor.java index eb99ec0a2f816..1f5a629d49fe5 100644 --- a/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/SmallRyeReactiveMessagingProcessor.java +++ b/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/SmallRyeReactiveMessagingProcessor.java @@ -65,8 +65,8 @@ import io.quarkus.runtime.metrics.MetricsFactory; import io.quarkus.runtime.util.HashUtil; import io.quarkus.smallrye.health.deployment.spi.HealthBuildItem; -import io.quarkus.smallrye.reactivemessaging.deployment.items.ChannelBuildItem; import io.quarkus.smallrye.reactivemessaging.deployment.items.ChannelDirection; +import io.quarkus.smallrye.reactivemessaging.deployment.items.ConnectorManagedChannelBuildItem; import io.quarkus.smallrye.reactivemessaging.deployment.items.InjectedChannelBuildItem; import io.quarkus.smallrye.reactivemessaging.deployment.items.InjectedEmitterBuildItem; import io.quarkus.smallrye.reactivemessaging.deployment.items.MediatorBuildItem; @@ -233,7 +233,7 @@ public void enableHealth(ReactiveMessagingBuildTimeConfig buildTimeConfig, public void build(SmallRyeReactiveMessagingRecorder recorder, RecorderContext recorderContext, BuildProducer syntheticBeans, List mediatorMethods, - List channelBuildItems, + List connectorManagedChannels, List emitterFields, List channelFields, BuildProducer generatedClass, @@ -243,9 +243,9 @@ public void build(SmallRyeReactiveMessagingRecorder recorder, RecorderContext re ClassOutput classOutput = new GeneratedClassGizmoAdaptor(generatedClass, true); - Set innerIncomingChannelNames = channelBuildItems.stream() - .filter(c -> !c.isManagedByAConnector() && c.getDirection() == ChannelDirection.INCOMING) - .map(ChannelBuildItem::getName) + Set connectorManagedIncomingChannels = connectorManagedChannels.stream() + .filter(c -> c.getDirection() == ChannelDirection.INCOMING) + .map(ConnectorManagedChannelBuildItem::getName) .collect(Collectors.toSet()); List mediatorConfigurations = new ArrayList<>(mediatorMethods.size()); @@ -288,9 +288,9 @@ public void build(SmallRyeReactiveMessagingRecorder recorder, RecorderContext re QuarkusMediatorConfiguration mediatorConfiguration = QuarkusMediatorConfigurationUtil .create(methodInfo, isSuspendMethod, bean, recorderContext, Thread.currentThread().getContextClassLoader(), conf.strict, - consumesFromInnerChannel(methodInfo, innerIncomingChannelNames) - ? ReactiveMessagingConfiguration.ExecutionMode.EVENT_LOOP // disable execution mode setting for inner channels - : conf.blockingSignaturesExecutionMode); + consumesFromConnector(methodInfo, connectorManagedIncomingChannels) + ? conf.blockingSignaturesExecutionMode + : ReactiveMessagingConfiguration.ExecutionMode.EVENT_LOOP); // disable execution mode setting for inner channels mediatorConfigurations.add(mediatorConfiguration); String generatedInvokerName = generateInvoker(bean, methodInfo, isSuspendMethod, mediatorConfiguration, @@ -577,14 +577,14 @@ private boolean doesImplement(ClassInfo clazz, DotName iface, IndexView index) { })); } - boolean consumesFromInnerChannel(MethodInfo methodInfo, Set innerChannelNames) { + boolean consumesFromConnector(MethodInfo methodInfo, Set connectorManagedChannels) { AnnotationInstance incoming = methodInfo.annotation(INCOMING); if (incoming != null) { - return innerChannelNames.contains(incoming.value().asString()); + return connectorManagedChannels.contains(incoming.value().asString()); } AnnotationInstance incomings = methodInfo.annotation(INCOMINGS); if (incomings != null) { - return innerChannelNames.containsAll( + return connectorManagedChannels.containsAll( Arrays.stream(incomings.value().asNestedArray()) .map(i -> i.value().asString()).collect(Collectors.toSet())); } diff --git a/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/signatures/BlockingSignatureExecutionModeTest.java b/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/signatures/BlockingSignatureExecutionModeTest.java index c44f00fd86571..8dfe1e5d2ae3b 100644 --- a/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/signatures/BlockingSignatureExecutionModeTest.java +++ b/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/signatures/BlockingSignatureExecutionModeTest.java @@ -31,7 +31,7 @@ public class BlockingSignatureExecutionModeTest { BlockingConsumerFromConnector.class, ConsumerFromConnector.class, ConsumerFromInnerChannel.class)) - .overrideConfigKey("mp.messaging.incoming.a.connector", "dummy") + // .overrideConfigKey("mp.messaging.incoming.a.connector", "dummy") // discovered by the extension .overrideConfigKey("mp.messaging.incoming.a.values", "bonjour") .overrideConfigKey("mp.messaging.incoming.b.connector", "dummy") .overrideConfigKey("mp.messaging.incoming.b.values", "bonjour")