diff --git a/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/DotNames.java b/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/DotNames.java index 5dd492dbbcb1f..8d3b71041bdca 100644 --- a/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/DotNames.java +++ b/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/DotNames.java @@ -4,6 +4,9 @@ final class DotNames { // @formatter:off + static final DotName INSTANCE = DotName.createSimple(jakarta.enterprise.inject.Instance.class.getName()); + static final DotName INJECTABLE_INSTANCE = DotName.createSimple(io.quarkus.arc.InjectableInstance.class.getName()); + static final DotName PROVIDER = DotName.createSimple(jakarta.inject.Provider.class.getName()); static final DotName INCOMING = DotName.createSimple(org.eclipse.microprofile.reactive.messaging.Incoming.class.getName()); static final DotName INCOMINGS = DotName.createSimple(io.smallrye.reactive.messaging.annotations.Incomings.class.getName()); static final DotName OUTGOING = DotName.createSimple(org.eclipse.microprofile.reactive.messaging.Outgoing.class.getName()); diff --git a/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/SmallRyeReactiveMessagingKafkaProcessor.java b/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/SmallRyeReactiveMessagingKafkaProcessor.java index 96234fc7cc580..03c4133fe4edb 100644 --- a/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/SmallRyeReactiveMessagingKafkaProcessor.java +++ b/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/SmallRyeReactiveMessagingKafkaProcessor.java @@ -20,7 +20,6 @@ import org.jboss.jandex.DotName; import org.jboss.jandex.IndexView; import org.jboss.jandex.MethodInfo; -import org.jboss.jandex.MethodParameterInfo; import org.jboss.jandex.Type; import org.jboss.logging.Logger; @@ -354,15 +353,19 @@ private void processIncomingType(DefaultSerdeDiscoveryState discovery, } private Type getInjectionPointType(AnnotationInstance annotation) { - switch (annotation.target().kind()) { - case FIELD: - return annotation.target().asField().type(); - case METHOD_PARAMETER: - MethodParameterInfo parameter = annotation.target().asMethodParameter(); - return parameter.method().parameterType(parameter.position()); - default: - return null; - } + return switch (annotation.target().kind()) { + case FIELD -> handleInstanceChannelInjection(annotation.target().asField().type()); + case METHOD_PARAMETER -> handleInstanceChannelInjection(annotation.target().asMethodParameter().type()); + default -> null; + }; + } + + private Type handleInstanceChannelInjection(Type type) { + return (DotNames.INSTANCE.equals(type.name()) + || DotNames.PROVIDER.equals(type.name()) + || DotNames.INJECTABLE_INSTANCE.equals(type.name())) + ? type.asParameterizedType().arguments().get(0) + : type; } private void handleAdditionalProperties(String channelName, boolean incoming, DefaultSerdeDiscoveryState discovery, diff --git a/extensions/smallrye-reactive-messaging-kafka/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/DefaultSerdeConfigTest.java b/extensions/smallrye-reactive-messaging-kafka/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/DefaultSerdeConfigTest.java index 05289111c374d..cc971ba643bcb 100644 --- a/extensions/smallrye-reactive-messaging-kafka/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/DefaultSerdeConfigTest.java +++ b/extensions/smallrye-reactive-messaging-kafka/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/DefaultSerdeConfigTest.java @@ -13,6 +13,7 @@ import java.util.concurrent.CompletionStage; import java.util.function.Function; +import jakarta.enterprise.inject.Instance; import jakarta.inject.Inject; import org.apache.avro.generic.GenericRecord; @@ -41,6 +42,7 @@ import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; +import io.quarkus.arc.InjectableInstance; import io.quarkus.commons.classloading.ClassLoaderHelper; import io.quarkus.deployment.builditem.GeneratedClassBuildItem; import io.quarkus.deployment.builditem.RunTimeConfigurationDefaultBuildItem; @@ -111,6 +113,7 @@ boolean isKafkaConnector(List list, boolean in assertThat(configs) .extracting(RunTimeConfigurationDefaultBuildItem::getKey, RunTimeConfigurationDefaultBuildItem::getValue) + .hasSize(expectations.length) .allSatisfy(tuple -> { Object[] e = tuple.toArray(); String key = (String) e[0]; @@ -3048,5 +3051,26 @@ private static class ChannelChildSerializer { Multi channel2; } + @Test + void instanceInjectionPoint() { + Tuple[] expectations = { + tuple("mp.messaging.outgoing.channel1.value.serializer", "org.apache.kafka.common.serialization.StringSerializer"), + tuple("mp.messaging.incoming.channel2.value.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer"), + tuple("mp.messaging.outgoing.channel3.value.serializer", "org.apache.kafka.common.serialization.DoubleSerializer"), + }; + doTest(expectations, InstanceInjectionPoint.class); + } + + private static class InstanceInjectionPoint { + @Channel("channel1") + Instance> emitter1; + + @Channel("channel2") + Instance> channel2; + + @Channel("channel3") + InjectableInstance> channel3; + } + } diff --git a/extensions/smallrye-reactive-messaging-pulsar/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/pulsar/deployment/DotNames.java b/extensions/smallrye-reactive-messaging-pulsar/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/pulsar/deployment/DotNames.java index dc321cb954ec5..efff33c3c0b6c 100644 --- a/extensions/smallrye-reactive-messaging-pulsar/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/pulsar/deployment/DotNames.java +++ b/extensions/smallrye-reactive-messaging-pulsar/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/pulsar/deployment/DotNames.java @@ -4,6 +4,9 @@ final class DotNames { // @formatter:off + static final DotName INSTANCE = DotName.createSimple(jakarta.enterprise.inject.Instance.class.getName()); + static final DotName INJECTABLE_INSTANCE = DotName.createSimple(io.quarkus.arc.InjectableInstance.class.getName()); + static final DotName PROVIDER = DotName.createSimple(jakarta.inject.Provider.class.getName()); static final DotName INCOMING = DotName.createSimple(org.eclipse.microprofile.reactive.messaging.Incoming.class.getName()); static final DotName INCOMINGS = DotName.createSimple(io.smallrye.reactive.messaging.annotations.Incomings.class.getName()); static final DotName OUTGOING = DotName.createSimple(org.eclipse.microprofile.reactive.messaging.Outgoing.class.getName()); diff --git a/extensions/smallrye-reactive-messaging-pulsar/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/pulsar/deployment/PulsarSchemaDiscoveryProcessor.java b/extensions/smallrye-reactive-messaging-pulsar/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/pulsar/deployment/PulsarSchemaDiscoveryProcessor.java index 3f88ab082a283..f5539127a0149 100644 --- a/extensions/smallrye-reactive-messaging-pulsar/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/pulsar/deployment/PulsarSchemaDiscoveryProcessor.java +++ b/extensions/smallrye-reactive-messaging-pulsar/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/pulsar/deployment/PulsarSchemaDiscoveryProcessor.java @@ -10,7 +10,6 @@ import org.jboss.jandex.AnnotationInstance; import org.jboss.jandex.DotName; import org.jboss.jandex.MethodInfo; -import org.jboss.jandex.MethodParameterInfo; import org.jboss.jandex.Type; import org.jboss.logging.Logger; @@ -144,15 +143,19 @@ private static String incomingSchemaKey(String channelName) { } private Type getInjectionPointType(AnnotationInstance annotation) { - switch (annotation.target().kind()) { - case FIELD: - return annotation.target().asField().type(); - case METHOD_PARAMETER: - MethodParameterInfo parameter = annotation.target().asMethodParameter(); - return parameter.method().parameterType(parameter.position()); - default: - return null; - } + return switch (annotation.target().kind()) { + case FIELD -> handleInstanceChannelInjection(annotation.target().asField().type()); + case METHOD_PARAMETER -> handleInstanceChannelInjection(annotation.target().asMethodParameter().type()); + default -> null; + }; + } + + private Type handleInstanceChannelInjection(Type type) { + return (DotNames.INSTANCE.equals(type.name()) + || DotNames.PROVIDER.equals(type.name()) + || DotNames.INJECTABLE_INSTANCE.equals(type.name())) + ? type.asParameterizedType().arguments().get(0) + : type; } private void produceRuntimeConfigurationDefaultBuildItem(DefaultSchemaDiscoveryState discovery, diff --git a/extensions/smallrye-reactive-messaging-pulsar/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/pulsar/deployment/DefaultSchemaConfigTest.java b/extensions/smallrye-reactive-messaging-pulsar/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/pulsar/deployment/DefaultSchemaConfigTest.java index 898ade58c059d..b7e9e7e686948 100644 --- a/extensions/smallrye-reactive-messaging-pulsar/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/pulsar/deployment/DefaultSchemaConfigTest.java +++ b/extensions/smallrye-reactive-messaging-pulsar/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/pulsar/deployment/DefaultSchemaConfigTest.java @@ -15,8 +15,10 @@ import java.util.concurrent.CompletionStage; import java.util.function.Supplier; +import jakarta.enterprise.inject.Instance; import jakarta.enterprise.inject.Produces; import jakarta.inject.Inject; +import jakarta.inject.Provider; import org.apache.avro.specific.AvroGenerated; import org.apache.pulsar.client.api.Messages; @@ -40,6 +42,7 @@ import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; +import io.quarkus.arc.InjectableInstance; import io.quarkus.arc.deployment.SyntheticBeanBuildItem; import io.quarkus.commons.classloading.ClassLoaderHelper; import io.quarkus.deployment.annotations.BuildProducer; @@ -2108,5 +2111,26 @@ Multi> method4() { } } + @Test + void instanceInjectionPoint() { + Tuple[] expectations = { + tuple("mp.messaging.outgoing.channel1.schema", "STRING"), + tuple("mp.messaging.incoming.channel2.schema", "INT32"), + tuple("mp.messaging.outgoing.channel3.schema", "DOUBLE"), + }; + doTest(expectations, InstanceInjectionPoint.class); + } + + private static class InstanceInjectionPoint { + @Channel("channel1") + Instance> emitter1; + + @Channel("channel2") + Provider> channel2; + + @Channel("channel3") + InjectableInstance> channel3; + } + }