diff --git a/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/DotNames.java b/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/DotNames.java index 5dd492dbbcb1f..2ae0a6c5806c2 100644 --- a/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/DotNames.java +++ b/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/DotNames.java @@ -4,6 +4,7 @@ final class DotNames { // @formatter:off + static final DotName INSTANCE = DotName.createSimple(jakarta.enterprise.inject.Instance.class.getName()); static final DotName INCOMING = DotName.createSimple(org.eclipse.microprofile.reactive.messaging.Incoming.class.getName()); static final DotName INCOMINGS = DotName.createSimple(io.smallrye.reactive.messaging.annotations.Incomings.class.getName()); static final DotName OUTGOING = DotName.createSimple(org.eclipse.microprofile.reactive.messaging.Outgoing.class.getName()); diff --git a/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/SmallRyeReactiveMessagingKafkaProcessor.java b/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/SmallRyeReactiveMessagingKafkaProcessor.java index 96234fc7cc580..97c36324dc36c 100644 --- a/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/SmallRyeReactiveMessagingKafkaProcessor.java +++ b/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/SmallRyeReactiveMessagingKafkaProcessor.java @@ -354,15 +354,15 @@ private void processIncomingType(DefaultSerdeDiscoveryState discovery, } private Type getInjectionPointType(AnnotationInstance annotation) { - switch (annotation.target().kind()) { - case FIELD: - return annotation.target().asField().type(); - case METHOD_PARAMETER: - MethodParameterInfo parameter = annotation.target().asMethodParameter(); - return parameter.method().parameterType(parameter.position()); - default: - return null; - } + return switch (annotation.target().kind()) { + case FIELD -> handleInstanceChannelInjection(annotation.target().asField().type()); + case METHOD_PARAMETER -> handleInstanceChannelInjection(annotation.target().asMethodParameter().type()); + default -> null; + }; + } + + private Type handleInstanceChannelInjection(Type type) { + return DotNames.INSTANCE.equals(type.name()) ? type.asParameterizedType().arguments().get(0) : type; } private void handleAdditionalProperties(String channelName, boolean incoming, DefaultSerdeDiscoveryState discovery, diff --git a/extensions/smallrye-reactive-messaging-kafka/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/DefaultSerdeConfigTest.java b/extensions/smallrye-reactive-messaging-kafka/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/DefaultSerdeConfigTest.java index 05289111c374d..5fa38ac61ffb8 100644 --- a/extensions/smallrye-reactive-messaging-kafka/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/DefaultSerdeConfigTest.java +++ b/extensions/smallrye-reactive-messaging-kafka/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/DefaultSerdeConfigTest.java @@ -13,6 +13,7 @@ import java.util.concurrent.CompletionStage; import java.util.function.Function; +import jakarta.enterprise.inject.Instance; import jakarta.inject.Inject; import org.apache.avro.generic.GenericRecord; @@ -111,6 +112,7 @@ boolean isKafkaConnector(List list, boolean in assertThat(configs) .extracting(RunTimeConfigurationDefaultBuildItem::getKey, RunTimeConfigurationDefaultBuildItem::getValue) + .hasSize(expectations.length) .allSatisfy(tuple -> { Object[] e = tuple.toArray(); String key = (String) e[0]; @@ -3048,5 +3050,22 @@ private static class ChannelChildSerializer { Multi channel2; } + @Test + void instanceInjectionPoint() { + Tuple[] expectations = { + tuple("mp.messaging.outgoing.channel1.value.serializer", "org.apache.kafka.common.serialization.StringSerializer"), + tuple("mp.messaging.incoming.channel2.value.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer"), + }; + doTest(expectations, InstanceInjectionPoint.class); + } + + private static class InstanceInjectionPoint { + @Channel("channel1") + Instance> emitter1; + + @Channel("channel2") + Instance> channel2; + } + } diff --git a/extensions/smallrye-reactive-messaging-pulsar/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/pulsar/deployment/DotNames.java b/extensions/smallrye-reactive-messaging-pulsar/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/pulsar/deployment/DotNames.java index dc321cb954ec5..4173d8f45717d 100644 --- a/extensions/smallrye-reactive-messaging-pulsar/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/pulsar/deployment/DotNames.java +++ b/extensions/smallrye-reactive-messaging-pulsar/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/pulsar/deployment/DotNames.java @@ -4,6 +4,7 @@ final class DotNames { // @formatter:off + static final DotName INSTANCE = DotName.createSimple(jakarta.enterprise.inject.Instance.class.getName()); static final DotName INCOMING = DotName.createSimple(org.eclipse.microprofile.reactive.messaging.Incoming.class.getName()); static final DotName INCOMINGS = DotName.createSimple(io.smallrye.reactive.messaging.annotations.Incomings.class.getName()); static final DotName OUTGOING = DotName.createSimple(org.eclipse.microprofile.reactive.messaging.Outgoing.class.getName()); diff --git a/extensions/smallrye-reactive-messaging-pulsar/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/pulsar/deployment/PulsarSchemaDiscoveryProcessor.java b/extensions/smallrye-reactive-messaging-pulsar/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/pulsar/deployment/PulsarSchemaDiscoveryProcessor.java index 3f88ab082a283..223769f65b11f 100644 --- a/extensions/smallrye-reactive-messaging-pulsar/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/pulsar/deployment/PulsarSchemaDiscoveryProcessor.java +++ b/extensions/smallrye-reactive-messaging-pulsar/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/pulsar/deployment/PulsarSchemaDiscoveryProcessor.java @@ -10,7 +10,6 @@ import org.jboss.jandex.AnnotationInstance; import org.jboss.jandex.DotName; import org.jboss.jandex.MethodInfo; -import org.jboss.jandex.MethodParameterInfo; import org.jboss.jandex.Type; import org.jboss.logging.Logger; @@ -144,15 +143,15 @@ private static String incomingSchemaKey(String channelName) { } private Type getInjectionPointType(AnnotationInstance annotation) { - switch (annotation.target().kind()) { - case FIELD: - return annotation.target().asField().type(); - case METHOD_PARAMETER: - MethodParameterInfo parameter = annotation.target().asMethodParameter(); - return parameter.method().parameterType(parameter.position()); - default: - return null; - } + return switch (annotation.target().kind()) { + case FIELD -> handleInstanceChannelInjection(annotation.target().asField().type()); + case METHOD_PARAMETER -> handleInstanceChannelInjection(annotation.target().asMethodParameter().type()); + default -> null; + }; + } + + private Type handleInstanceChannelInjection(Type type) { + return DotNames.INSTANCE.equals(type.name()) ? type.asParameterizedType().arguments().get(0) : type; } private void produceRuntimeConfigurationDefaultBuildItem(DefaultSchemaDiscoveryState discovery, diff --git a/extensions/smallrye-reactive-messaging-pulsar/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/pulsar/deployment/DefaultSchemaConfigTest.java b/extensions/smallrye-reactive-messaging-pulsar/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/pulsar/deployment/DefaultSchemaConfigTest.java index 898ade58c059d..d0697b0521000 100644 --- a/extensions/smallrye-reactive-messaging-pulsar/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/pulsar/deployment/DefaultSchemaConfigTest.java +++ b/extensions/smallrye-reactive-messaging-pulsar/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/pulsar/deployment/DefaultSchemaConfigTest.java @@ -15,6 +15,7 @@ import java.util.concurrent.CompletionStage; import java.util.function.Supplier; +import jakarta.enterprise.inject.Instance; import jakarta.enterprise.inject.Produces; import jakarta.inject.Inject; @@ -2108,5 +2109,22 @@ Multi> method4() { } } + @Test + void instanceInjectionPoint() { + Tuple[] expectations = { + tuple("mp.messaging.outgoing.channel1.schema", "STRING"), + tuple("mp.messaging.incoming.channel2.schema", "INT32"), + }; + doTest(expectations, InstanceInjectionPoint.class); + } + + private static class InstanceInjectionPoint { + @Channel("channel1") + Instance> emitter1; + + @Channel("channel2") + Instance> channel2; + } + }