Skip to content

Commit

Permalink
Merge pull request quarkusio#39882 from ozangunalp/fix_messaging_bloc…
Browse files Browse the repository at this point in the history
…king_signatures

Quarkus Messaging Blocking Signatures execution mode fix for inner channels
  • Loading branch information
ozangunalp authored Apr 4, 2024
2 parents 0e2c02f + c40eb94 commit d752e8a
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@
import io.quarkus.runtime.metrics.MetricsFactory;
import io.quarkus.runtime.util.HashUtil;
import io.quarkus.smallrye.health.deployment.spi.HealthBuildItem;
import io.quarkus.smallrye.reactivemessaging.deployment.items.ChannelBuildItem;
import io.quarkus.smallrye.reactivemessaging.deployment.items.ChannelDirection;
import io.quarkus.smallrye.reactivemessaging.deployment.items.ConnectorManagedChannelBuildItem;
import io.quarkus.smallrye.reactivemessaging.deployment.items.InjectedChannelBuildItem;
import io.quarkus.smallrye.reactivemessaging.deployment.items.InjectedEmitterBuildItem;
import io.quarkus.smallrye.reactivemessaging.deployment.items.MediatorBuildItem;
Expand Down Expand Up @@ -233,7 +233,7 @@ public void enableHealth(ReactiveMessagingBuildTimeConfig buildTimeConfig,
public void build(SmallRyeReactiveMessagingRecorder recorder, RecorderContext recorderContext,
BuildProducer<SyntheticBeanBuildItem> syntheticBeans,
List<MediatorBuildItem> mediatorMethods,
List<ChannelBuildItem> channelBuildItems,
List<ConnectorManagedChannelBuildItem> connectorManagedChannels,
List<InjectedEmitterBuildItem> emitterFields,
List<InjectedChannelBuildItem> channelFields,
BuildProducer<GeneratedClassBuildItem> generatedClass,
Expand All @@ -243,9 +243,9 @@ public void build(SmallRyeReactiveMessagingRecorder recorder, RecorderContext re

ClassOutput classOutput = new GeneratedClassGizmoAdaptor(generatedClass, true);

Set<String> innerIncomingChannelNames = channelBuildItems.stream()
.filter(c -> !c.isManagedByAConnector() && c.getDirection() == ChannelDirection.INCOMING)
.map(ChannelBuildItem::getName)
Set<String> connectorManagedIncomingChannels = connectorManagedChannels.stream()
.filter(c -> c.getDirection() == ChannelDirection.INCOMING)
.map(ConnectorManagedChannelBuildItem::getName)
.collect(Collectors.toSet());

List<QuarkusMediatorConfiguration> mediatorConfigurations = new ArrayList<>(mediatorMethods.size());
Expand Down Expand Up @@ -288,9 +288,9 @@ public void build(SmallRyeReactiveMessagingRecorder recorder, RecorderContext re
QuarkusMediatorConfiguration mediatorConfiguration = QuarkusMediatorConfigurationUtil
.create(methodInfo, isSuspendMethod, bean, recorderContext,
Thread.currentThread().getContextClassLoader(), conf.strict,
consumesFromInnerChannel(methodInfo, innerIncomingChannelNames)
? ReactiveMessagingConfiguration.ExecutionMode.EVENT_LOOP // disable execution mode setting for inner channels
: conf.blockingSignaturesExecutionMode);
consumesFromConnector(methodInfo, connectorManagedIncomingChannels)
? conf.blockingSignaturesExecutionMode
: ReactiveMessagingConfiguration.ExecutionMode.EVENT_LOOP); // disable execution mode setting for inner channels
mediatorConfigurations.add(mediatorConfiguration);

String generatedInvokerName = generateInvoker(bean, methodInfo, isSuspendMethod, mediatorConfiguration,
Expand Down Expand Up @@ -577,14 +577,14 @@ private boolean doesImplement(ClassInfo clazz, DotName iface, IndexView index) {
}));
}

boolean consumesFromInnerChannel(MethodInfo methodInfo, Set<String> innerChannelNames) {
boolean consumesFromConnector(MethodInfo methodInfo, Set<String> connectorManagedChannels) {
AnnotationInstance incoming = methodInfo.annotation(INCOMING);
if (incoming != null) {
return innerChannelNames.contains(incoming.value().asString());
return connectorManagedChannels.contains(incoming.value().asString());
}
AnnotationInstance incomings = methodInfo.annotation(INCOMINGS);
if (incomings != null) {
return innerChannelNames.containsAll(
return connectorManagedChannels.containsAll(
Arrays.stream(incomings.value().asNestedArray())
.map(i -> i.value().asString()).collect(Collectors.toSet()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public class BlockingSignatureExecutionModeTest {
BlockingConsumerFromConnector.class,
ConsumerFromConnector.class,
ConsumerFromInnerChannel.class))
.overrideConfigKey("mp.messaging.incoming.a.connector", "dummy")
// .overrideConfigKey("mp.messaging.incoming.a.connector", "dummy") // discovered by the extension
.overrideConfigKey("mp.messaging.incoming.a.values", "bonjour")
.overrideConfigKey("mp.messaging.incoming.b.connector", "dummy")
.overrideConfigKey("mp.messaging.incoming.b.values", "bonjour")
Expand Down

0 comments on commit d752e8a

Please sign in to comment.