Skip to content

Commit

Permalink
Support Serde detection for Instance injection of channels
Browse files Browse the repository at this point in the history
Resolves #44500
  • Loading branch information
ozangunalp committed Nov 15, 2024
1 parent 65b55fd commit e0b1335
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@

final class DotNames {
// @formatter:off
static final DotName INSTANCE = DotName.createSimple(jakarta.enterprise.inject.Instance.class.getName());
static final DotName INJECTABLE_INSTANCE = DotName.createSimple(io.quarkus.arc.InjectableInstance.class.getName());
static final DotName PROVIDER = DotName.createSimple(jakarta.inject.Provider.class.getName());
static final DotName INCOMING = DotName.createSimple(org.eclipse.microprofile.reactive.messaging.Incoming.class.getName());
static final DotName INCOMINGS = DotName.createSimple(io.smallrye.reactive.messaging.annotations.Incomings.class.getName());
static final DotName OUTGOING = DotName.createSimple(org.eclipse.microprofile.reactive.messaging.Outgoing.class.getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.jboss.jandex.DotName;
import org.jboss.jandex.IndexView;
import org.jboss.jandex.MethodInfo;
import org.jboss.jandex.MethodParameterInfo;
import org.jboss.jandex.Type;
import org.jboss.logging.Logger;

Expand Down Expand Up @@ -354,15 +353,19 @@ private void processIncomingType(DefaultSerdeDiscoveryState discovery,
}

private Type getInjectionPointType(AnnotationInstance annotation) {
switch (annotation.target().kind()) {
case FIELD:
return annotation.target().asField().type();
case METHOD_PARAMETER:
MethodParameterInfo parameter = annotation.target().asMethodParameter();
return parameter.method().parameterType(parameter.position());
default:
return null;
}
return switch (annotation.target().kind()) {
case FIELD -> handleInstanceChannelInjection(annotation.target().asField().type());
case METHOD_PARAMETER -> handleInstanceChannelInjection(annotation.target().asMethodParameter().type());
default -> null;
};
}

private Type handleInstanceChannelInjection(Type type) {
return (DotNames.INSTANCE.equals(type.name())
|| DotNames.PROVIDER.equals(type.name())
|| DotNames.INJECTABLE_INSTANCE.equals(type.name()))
? type.asParameterizedType().arguments().get(0)
: type;
}

private void handleAdditionalProperties(String channelName, boolean incoming, DefaultSerdeDiscoveryState discovery,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import java.util.concurrent.CompletionStage;
import java.util.function.Function;

import jakarta.enterprise.inject.Instance;
import jakarta.inject.Inject;

import org.apache.avro.generic.GenericRecord;
Expand Down Expand Up @@ -41,6 +42,7 @@
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;

import io.quarkus.arc.InjectableInstance;
import io.quarkus.commons.classloading.ClassLoaderHelper;
import io.quarkus.deployment.builditem.GeneratedClassBuildItem;
import io.quarkus.deployment.builditem.RunTimeConfigurationDefaultBuildItem;
Expand Down Expand Up @@ -111,6 +113,7 @@ boolean isKafkaConnector(List<ConnectorManagedChannelBuildItem> list, boolean in

assertThat(configs)
.extracting(RunTimeConfigurationDefaultBuildItem::getKey, RunTimeConfigurationDefaultBuildItem::getValue)
.hasSize(expectations.length)
.allSatisfy(tuple -> {
Object[] e = tuple.toArray();
String key = (String) e[0];
Expand Down Expand Up @@ -881,7 +884,7 @@ public void avroDtoInGenericRecordOut() {
tuple("mp.messaging.outgoing.channel33.value.serializer", "io.apicurio.registry.serde.avro.AvroKafkaSerializer"),
tuple("mp.messaging.incoming.channel34.value.deserializer", "io.apicurio.registry.serde.avro.AvroKafkaDeserializer"),
tuple("mp.messaging.incoming.channel34.apicurio.registry.use-specific-avro-reader", "true"),
tuple("mp.messaging.outgoing.channel35.value.serializer", "io.apicurio.registry.serde.avro.AvroKafkaSerializer"),
tuple("mp.messaging.outgoing.channel35.value.serializer", "io.apcurio.registry.serde.avro.AvroKafkaSerializer"),
tuple("mp.messaging.incoming.channel36.value.deserializer", "io.apicurio.registry.serde.avro.AvroKafkaDeserializer"),
tuple("mp.messaging.incoming.channel36.apicurio.registry.use-specific-avro-reader", "true"),
tuple("mp.messaging.outgoing.channel37.value.serializer", "io.apicurio.registry.serde.avro.AvroKafkaSerializer"),
Expand Down Expand Up @@ -3048,5 +3051,26 @@ private static class ChannelChildSerializer {
Multi<JsonbDto> channel2;
}

@Test
void instanceInjectionPoint() {
Tuple[] expectations = {
tuple("mp.messaging.outgoing.channel1.value.serializer", "org.apache.kafka.common.serialization.StringSerializer"),
tuple("mp.messaging.incoming.channel2.value.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer"),
tuple("mp.messaging.outgoing.channel3.value.serializer", "org.apache.kafka.common.serialization.DoubleSerializer"),
};
doTest(expectations, InstanceInjectionPoint.class);
}

private static class InstanceInjectionPoint {
@Channel("channel1")
Instance<Emitter<String>> emitter1;

@Channel("channel2")
Instance<Multi<Integer>> channel2;

@Channel("channel3")
InjectableInstance<MutinyEmitter<Double>> channel3;
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@

final class DotNames {
// @formatter:off
static final DotName INSTANCE = DotName.createSimple(jakarta.enterprise.inject.Instance.class.getName());
static final DotName INJECTABLE_INSTANCE = DotName.createSimple(io.quarkus.arc.InjectableInstance.class.getName());
static final DotName PROVIDER = DotName.createSimple(jakarta.inject.Provider.class.getName());
static final DotName INCOMING = DotName.createSimple(org.eclipse.microprofile.reactive.messaging.Incoming.class.getName());
static final DotName INCOMINGS = DotName.createSimple(io.smallrye.reactive.messaging.annotations.Incomings.class.getName());
static final DotName OUTGOING = DotName.createSimple(org.eclipse.microprofile.reactive.messaging.Outgoing.class.getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import org.jboss.jandex.AnnotationInstance;
import org.jboss.jandex.DotName;
import org.jboss.jandex.MethodInfo;
import org.jboss.jandex.MethodParameterInfo;
import org.jboss.jandex.Type;
import org.jboss.logging.Logger;

Expand Down Expand Up @@ -144,15 +143,19 @@ private static String incomingSchemaKey(String channelName) {
}

private Type getInjectionPointType(AnnotationInstance annotation) {
switch (annotation.target().kind()) {
case FIELD:
return annotation.target().asField().type();
case METHOD_PARAMETER:
MethodParameterInfo parameter = annotation.target().asMethodParameter();
return parameter.method().parameterType(parameter.position());
default:
return null;
}
return switch (annotation.target().kind()) {
case FIELD -> handleInstanceChannelInjection(annotation.target().asField().type());
case METHOD_PARAMETER -> handleInstanceChannelInjection(annotation.target().asMethodParameter().type());
default -> null;
};
}

private Type handleInstanceChannelInjection(Type type) {
return (DotNames.INSTANCE.equals(type.name())
|| DotNames.PROVIDER.equals(type.name())
|| DotNames.INJECTABLE_INSTANCE.equals(type.name()))
? type.asParameterizedType().arguments().get(0)
: type;
}

private void produceRuntimeConfigurationDefaultBuildItem(DefaultSchemaDiscoveryState discovery,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
import java.util.concurrent.CompletionStage;
import java.util.function.Supplier;

import jakarta.enterprise.inject.Instance;
import jakarta.enterprise.inject.Produces;
import jakarta.inject.Inject;
import jakarta.inject.Provider;

import org.apache.avro.specific.AvroGenerated;
import org.apache.pulsar.client.api.Messages;
Expand All @@ -40,6 +42,7 @@
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;

import io.quarkus.arc.InjectableInstance;
import io.quarkus.arc.deployment.SyntheticBeanBuildItem;
import io.quarkus.commons.classloading.ClassLoaderHelper;
import io.quarkus.deployment.annotations.BuildProducer;
Expand Down Expand Up @@ -2108,5 +2111,26 @@ Multi<GenericPayload<Long>> method4() {
}
}

@Test
void instanceInjectionPoint() {
Tuple[] expectations = {
tuple("mp.messaging.outgoing.channel1.schema", "STRING"),
tuple("mp.messaging.incoming.channel2.schema", "INT32"),
tuple("mp.messaging.outgoing.channel3.schema", "DOUBLE"),
};
doTest(expectations, InstanceInjectionPoint.class);
}

private static class InstanceInjectionPoint {
@Channel("channel1")
Instance<Emitter<String>> emitter1;

@Channel("channel2")
Provider<Multi<Integer>> channel2;

@Channel("channel3")
InjectableInstance<MutinyEmitter<Double>> channel3;
}


}

0 comments on commit e0b1335

Please sign in to comment.