Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Vert.x EventBus: use codec selector for non-concrete types #36374

Merged
merged 1 commit into from
Oct 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions docs/src/main/asciidoc/vertx-reference.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -712,9 +712,11 @@ Read xref:./virtual-threads.adoc[the virtual thread guide] for more details.

=== Use codecs

The https://vertx.io/docs/vertx-core/java/#event_bus[Vert.x Event Bus] uses codecs to _serialize_ and _deserialize_ objects.
The https://vertx.io/docs/vertx-core/java/#event_bus[Vert.x Event Bus] uses https://vertx.io/docs/vertx-core/java/#_message_codecs[codecs] to _serialize_ and _deserialize_ message objects.
Quarkus provides a default codec for local delivery.
So you can exchange objects as follows:
This codec is automatically used for return types and message body parameters of local consumers, i.e. methods annotated with `@ConsumeEvent` whete `ConsumeEvent#local() == true` (which is the default).

So that you can exchange the message objects as follows:

[source, java]
----
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@
import static io.quarkus.vertx.deployment.VertxConstants.LOCAL_EVENT_BUS_CODEC;
import static io.quarkus.vertx.deployment.VertxConstants.UNI;

import java.lang.reflect.Modifier;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand All @@ -17,11 +19,13 @@
import org.jboss.jandex.AnnotationInstance;
import org.jboss.jandex.AnnotationTarget;
import org.jboss.jandex.AnnotationValue;
import org.jboss.jandex.ClassInfo;
import org.jboss.jandex.DotName;
import org.jboss.jandex.IndexView;
import org.jboss.jandex.MethodInfo;
import org.jboss.jandex.ParameterizedType;
import org.jboss.jandex.Type;
import org.jboss.jandex.Type.Kind;
import org.jboss.logging.Logger;

import io.quarkus.arc.deployment.BeanArchiveIndexBuildItem;
Expand All @@ -46,51 +50,71 @@ public void registerCodecs(
BeanArchiveIndexBuildItem beanArchiveIndexBuildItem,
CombinedIndexBuildItem combinedIndex,
BuildProducer<MessageCodecBuildItem> messageCodecs,
BuildProducer<ReflectiveClassBuildItem> reflectiveClass) {
BuildProducer<ReflectiveClassBuildItem> reflectiveClass,
BuildProducer<LocalCodecSelectorTypesBuildItem> localCodecSelectorTypes) {

final IndexView index = beanArchiveIndexBuildItem.getIndex();
Collection<AnnotationInstance> consumeEventAnnotationInstances = index.getAnnotations(CONSUME_EVENT);
Map<DotName, DotName> codecByTypes = new HashMap<>();
Set<DotName> selectorTypes = new HashSet<>();

for (AnnotationInstance consumeEventAnnotationInstance : consumeEventAnnotationInstances) {
AnnotationTarget typeTarget = consumeEventAnnotationInstance.target();
if (typeTarget.kind() != AnnotationTarget.Kind.METHOD) {
throw new UnsupportedOperationException("@ConsumeEvent annotation must target a method");
throw new IllegalStateException("@ConsumeEvent annotation must target a method");
}

AnnotationValue local = consumeEventAnnotationInstance.value("local");
boolean isLocal = local == null || local.asBoolean();
MethodInfo method = typeTarget.asMethod();
Type codecTargetFromReturnType = extractPayloadTypeFromReturn(method);
Type codecTargetFromParameter = extractPayloadTypeFromParameter(method);

Type codecTargetFromParameter = extractPayloadTypeFromParameter(method);
// If the @ConsumeEvent set the codec, use this codec. It applies to the parameter
AnnotationValue codec = consumeEventAnnotationInstance.value("codec");
if (codec != null && codec.asClass().kind() == Type.Kind.CLASS) {
if (codecTargetFromParameter == null) {
throw new IllegalStateException("Invalid `codec` argument in @ConsumeEvent - no parameter");
}
codecByTypes.put(codecTargetFromParameter.name(), codec.asClass().asClassType().name());
} else if (codecTargetFromParameter != null) {
// Codec is not set, check if we have a built-in codec
if (!hasBuiltInCodec(codecTargetFromParameter)) {
// Ensure local delivery.
AnnotationValue local = consumeEventAnnotationInstance.value("local");
if (local != null && !local.asBoolean()) {
throw new UnsupportedOperationException(
"The generic message codec can only be used for local delivery,"
+ ", implement your own event bus codec for " + codecTargetFromParameter.name()
.toString());
} else if (!codecByTypes.containsKey(codecTargetFromParameter.name())) {
} else if (codecTargetFromParameter != null && !hasBuiltInCodec(codecTargetFromParameter)) {
// Codec is not set and built-in codecs cannot be used
if (!isLocal) {
throw new IllegalStateException(
"The Local Message Codec can only be used for local delivery,"
+ " you will need to implement a message codec for " + codecTargetFromParameter.name()
.toString()
+ " and make use of @ConsumeEvent#codec()");
} else if (!codecByTypes.containsKey(codecTargetFromParameter.name())) {
if (isConcreteClass(codecTargetFromParameter, index)) {
// The default codec makes only sense for concrete classes
LOGGER.debugf("Local Message Codec registered for type %s",
codecTargetFromParameter);
codecByTypes.put(codecTargetFromParameter.name(), LOCAL_EVENT_BUS_CODEC);
} else {
LOGGER.debugf("Local Message Codec will be selected for type %s", codecTargetFromParameter);
selectorTypes.add(codecTargetFromParameter.name());
}
}
}

if (codecTargetFromReturnType != null && !hasBuiltInCodec(codecTargetFromReturnType)
&& !codecByTypes.containsKey(codecTargetFromReturnType.name())) {

LOGGER.debugf("Local Message Codec registered for type %s", codecTargetFromReturnType);
codecByTypes.put(codecTargetFromReturnType.name(), LOCAL_EVENT_BUS_CODEC);
Type codecTargetFromReturnType = extractPayloadTypeFromReturn(method);
if (codecTargetFromReturnType != null && !hasBuiltInCodec(codecTargetFromReturnType)) {
if (!isLocal) {
throw new IllegalStateException(
"The Local Message Codec can only be used for local delivery,"
+ " you will need to modify the method to consume io.vertx.core.eventbus.Message, implement a message codec for "
+ codecTargetFromReturnType.name()
.toString()
+ " and make use of io.vertx.core.eventbus.DeliveryOptions");
} else if (!codecByTypes.containsKey(codecTargetFromReturnType.name())) {
if (isConcreteClass(codecTargetFromReturnType, index)) {
// The default codec makes only sense for concrete classes
LOGGER.debugf("Local Message Codec registered for type %s", codecTargetFromReturnType);
codecByTypes.put(codecTargetFromReturnType.name(), LOCAL_EVENT_BUS_CODEC);
} else {
LOGGER.debugf("Local Message Codec will be selected for type %s", codecTargetFromReturnType);
selectorTypes.add(codecTargetFromReturnType.name());
}
}
}
}

Expand Down Expand Up @@ -133,6 +157,9 @@ public void accept(String name) {
reflectiveClass.produce(ReflectiveClassBuildItem.builder(name).methods().build());
}
});

localCodecSelectorTypes.produce(new LocalCodecSelectorTypesBuildItem(
selectorTypes.stream().map(Object::toString).collect(Collectors.toSet())));
}

private static final List<String> BUILT_IN_CODECS = Arrays.asList(
Expand Down Expand Up @@ -220,4 +247,14 @@ private static boolean hasBuiltInCodec(Type type) {
private static boolean isMessageClass(ParameterizedType type) {
return VertxConstants.isMessage(type.name());
}

private static boolean isConcreteClass(Type type, IndexView index) {
if (type != null && type.kind() == Kind.CLASS) {
ClassInfo clazz = index.getClassByName(type.name());
if (clazz != null) {
return !clazz.isInterface() && !Modifier.isAbstract(clazz.flags());
}
}
return false;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package io.quarkus.vertx.deployment;

import java.util.Set;

import io.quarkus.builder.item.SimpleBuildItem;

/**
* Carries all types for which the {@link io.quarkus.vertx.LocalEventBusCodec} should be selected automatically.
*/
public final class LocalCodecSelectorTypesBuildItem extends SimpleBuildItem {

private final Set<String> types;

LocalCodecSelectorTypesBuildItem(Set<String> types) {
this.types = types;
}

public Set<String> getTypes() {
return types;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import static io.quarkus.vertx.deployment.VertxConstants.isMessage;
import static io.quarkus.vertx.deployment.VertxConstants.isMessageHeaders;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -45,7 +46,6 @@
import io.quarkus.deployment.builditem.ShutdownContextBuildItem;
import io.quarkus.deployment.builditem.nativeimage.ReflectiveClassBuildItem;
import io.quarkus.deployment.builditem.nativeimage.ServiceProviderBuildItem;
import io.quarkus.deployment.recording.RecorderContext;
import io.quarkus.gizmo.ClassOutput;
import io.quarkus.vertx.ConsumeEvent;
import io.quarkus.vertx.core.deployment.CoreVertxBuildItem;
Expand Down Expand Up @@ -74,7 +74,7 @@ VertxBuildItem build(CoreVertxBuildItem vertx, VertxEventBusConsumerRecorder rec
BuildProducer<GeneratedClassBuildItem> generatedClass,
AnnotationProxyBuildItem annotationProxy, LaunchModeBuildItem launchMode, ShutdownContextBuildItem shutdown,
BuildProducer<ServiceStartBuildItem> serviceStart, BuildProducer<ReflectiveClassBuildItem> reflectiveClass,
List<MessageCodecBuildItem> codecs, RecorderContext recorderContext) {
List<MessageCodecBuildItem> codecs, LocalCodecSelectorTypesBuildItem localCodecSelectorTypes) {
Map<String, ConsumeEvent> messageConsumerConfigurations = new HashMap<>();
ClassOutput classOutput = new GeneratedClassGizmoAdaptor(generatedClass, true);
for (EventConsumerBusinessMethodItem businessMethod : messageConsumerBusinessMethods) {
Expand All @@ -87,15 +87,20 @@ VertxBuildItem build(CoreVertxBuildItem vertx, VertxEventBusConsumerRecorder rec
reflectiveClass.produce(ReflectiveClassBuildItem.builder(invokerClass).build());
}

ClassLoader tccl = Thread.currentThread().getContextClassLoader();
Map<Class<?>, Class<?>> codecByClass = new HashMap<>();
for (MessageCodecBuildItem messageCodecItem : codecs) {
codecByClass.put(recorderContext.classProxy(messageCodecItem.getType()),
recorderContext.classProxy(messageCodecItem.getCodec()));
codecByClass.put(tryLoad(messageCodecItem.getType(), tccl), tryLoad(messageCodecItem.getCodec(), tccl));
}

List<Class<?>> selectorTypes = new ArrayList<>();
for (String name : localCodecSelectorTypes.getTypes()) {
selectorTypes.add(tryLoad(name, tccl));
}

recorder.configureVertx(vertx.getVertx(), messageConsumerConfigurations,
launchMode.getLaunchMode(),
shutdown, codecByClass);
shutdown, codecByClass, selectorTypes);
serviceStart.produce(new ServiceStartBuildItem("vertx"));
return new VertxBuildItem(recorder.forceStart(vertx.getVertx()));
}
Expand Down Expand Up @@ -190,4 +195,12 @@ void faultToleranceIntegration(Capabilities capabilities, BuildProducer<ServiceP
"io.smallrye.faulttolerance.vertx.VertxEventLoop"));
}
}

private Class<?> tryLoad(String name, ClassLoader tccl) {
try {
return tccl.loadClass(name);
} catch (ClassNotFoundException e) {
throw new IllegalStateException("Unable to load type: " + name, e);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
package io.quarkus.vertx;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.Function;
import java.util.function.Supplier;

import jakarta.inject.Inject;

Expand Down Expand Up @@ -60,10 +63,10 @@ public void testWithUserCodec() {

@Test
public void testWithUserCodecNonLocal() {
Greeting hello = vertx.eventBus().<Greeting> request("nl-pet", new Pet("neo", "rabbit"))
String hello = vertx.eventBus().<String> request("nl-pet", new Pet("neo", "rabbit"))
.onItem().transform(Message::body)
.await().indefinitely();
assertThat(hello.getMessage()).isEqualTo("Non Local Hello NEO");
assertEquals("Non Local Hello NEO", hello);
}

@Test
Expand All @@ -79,6 +82,20 @@ public void testWithSubclass() {
assertThat(hello.getMessage()).isEqualTo("Hello my-subclass-event");
}

@Test
public void testWithInterfaceCodecTarget() {
Supplier<String> supplier = vertx.eventBus()
.<Supplier<String>> request("hello-supplier", new Function<String, String>() {
@Override
public String apply(String value) {
return value.toLowerCase();
}
})
.onItem().transform(Message::body)
.await().indefinitely();
assertEquals("foo", supplier.get());
}

static class Greeting {
private final String message;

Expand Down Expand Up @@ -118,12 +135,23 @@ void messageTypeWithTypeAnnotation(@NonNull Person person) {
public CompletionStage<Greeting> hello(Event event) {
return CompletableFuture.completedFuture(new Greeting("Hello " + event.getProperty()));
}

@ConsumeEvent("hello-supplier")
public Supplier<String> helloSupplier(Function<String, String> fun) {
return new Supplier<String>() {

@Override
public String get() {
return fun.apply("FOO");
}
};
}
}

static class MyNonLocalBean {
@ConsumeEvent(value = "nl-pet", codec = MyPetCodec.class, local = false)
public CompletionStage<Greeting> hello(Pet p) {
return CompletableFuture.completedFuture(new Greeting("Non Local Hello " + p.getName()));
public CompletionStage<String> hello(Pet p) {
return CompletableFuture.completedFuture("Non Local Hello " + p.getName());
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.quarkus.vertx;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;

import jakarta.inject.Inject;

Expand Down Expand Up @@ -54,10 +55,10 @@ public void testWithUserCodec() {

@Test
public void testWithUserCodecNonLocal() {
Greeting hello = vertx.eventBus().<Greeting> request("nl-pet", new Pet("neo", "rabbit"))
String hello = vertx.eventBus().<String> request("nl-pet", new Pet("neo", "rabbit"))
.onItem().transform(Message::body)
.await().indefinitely();
assertThat(hello.getMessage()).isEqualTo("Non Local Hello NEO");
assertEquals("Non Local Hello NEO", hello);
}

static class Greeting {
Expand Down Expand Up @@ -90,9 +91,9 @@ public Uni<Greeting> hello(Pet p) {

static class MyNonLocalBean {
@ConsumeEvent(value = "nl-pet", codec = MyPetCodec.class, local = false)
public Uni<Greeting> hello(Pet p) {
public Uni<String> hello(Pet p) {
return Uni.createFrom().item(
() -> new Greeting("Non Local Hello " + p.getName()))
() -> "Non Local Hello " + p.getName())
.emitOn(Infrastructure.getDefaultExecutor());
}
}
Expand Down
Loading
Loading