Skip to content

Commit

Permalink
Autodetect Kafka serializer/deserializer with Reactive Messaging
Browse files Browse the repository at this point in the history
  • Loading branch information
Ladicek committed Apr 27, 2021
1 parent 6ddda1f commit 932424b
Show file tree
Hide file tree
Showing 10 changed files with 2,549 additions and 11 deletions.
90 changes: 82 additions & 8 deletions docs/src/main/asciidoc/kafka.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -262,12 +262,12 @@ kafka.bootstrap.servers=localhost:9092
# Configure the Kafka sink (we write to it)
mp.messaging.outgoing.generated-price.connector=smallrye-kafka
mp.messaging.outgoing.generated-price.topic=prices
mp.messaging.incoming.prices.health-readiness-enabled=false
mp.messaging.outgoing.generated-price.value.serializer=org.apache.kafka.common.serialization.IntegerSerializer
# Configure the Kafka source (we read from it)
mp.messaging.incoming.prices.connector=smallrye-kafka
mp.messaging.incoming.prices.value.deserializer=org.apache.kafka.common.serialization.IntegerDeserializer
mp.messaging.incoming.prices.health-readiness-enabled=false
----

More details about this configuration is available on the https://kafka.apache.org/documentation/#producerconfigs[Producer configuration] and https://kafka.apache.org/documentation/#consumerconfigs[Consumer configuration] section from the Kafka documentation. These properties are configured with the prefix `kafka`.
Expand Down Expand Up @@ -495,6 +495,7 @@ To do this, we will need to setup JSON serialization with Jackson or JSON-B.

NOTE: With JSON serialization correctly configured, you can also use `Publisher<Fruit>` and `Emitter<Fruit>`.

[[jackson-serialization]]
=== Serializing via Jackson

First, you need to include the `quarkus-jackson` extension (if you already use the `quarkus-resteasy-jackson` extension, this is not needed).
Expand All @@ -507,9 +508,10 @@ First, you need to include the `quarkus-jackson` extension (if you already use t
</dependency>
----

There is an existing `ObjectMapperSerializer` that can be used to serialize all pojos via Jackson,
but the corresponding deserializer is generic, so it needs to be subclassed.
There is an existing `ObjectMapperSerializer` that can be used to serialize all pojos via Jackson.
You may create an empty subclass if you want to use <<serialization-autodetection>>.

The corresponding deserializer class needs to be subclassed.
So, let's create a `FruitDeserializer` that extends the `ObjectMapperDeserializer`.

[source,java]
Expand All @@ -519,7 +521,7 @@ package com.acme.fruit.jackson;
import io.quarkus.kafka.client.serialization.ObjectMapperDeserializer;
public class FruitDeserializer extends ObjectMapperDeserializer<Fruit> {
public FruitDeserializer(){
public FruitDeserializer() {
// pass the class to the parent.
super(Fruit.class);
}
Expand All @@ -543,6 +545,7 @@ mp.messaging.outgoing.fruit-out.value.serializer=io.quarkus.kafka.client.seriali

Now, your Kafka messages will contain a Jackson serialized representation of your Fruit pojo.

[[jsonb-serialization]]
=== Serializing via JSON-B

First, you need to include the `quarkus-jsonb` extension (if you already use the `quarkus-resteasy-jsonb` extension, this is not needed).
Expand All @@ -555,9 +558,10 @@ First, you need to include the `quarkus-jsonb` extension (if you already use the
</dependency>
----

There is an existing `JsonbSerializer` that can be used to serialize all pojos via JSON-B,
but the corresponding deserializer is generic, so it needs to be subclassed.
There is an existing `JsonbSerializer` that can be used to serialize all pojos via JSON-B.
You may create an empty subclass if you want to use <<serialization-autodetection>>.

The corresponding deserializer class needs to be subclassed.
So, let's create a `FruitDeserializer` that extends the generic `JsonbDeserializer`.

[source,java]
Expand All @@ -567,15 +571,15 @@ package com.acme.fruit.jsonb;
import io.quarkus.kafka.client.serialization.JsonbDeserializer;
public class FruitDeserializer extends JsonbDeserializer<Fruit> {
public FruitDeserializer(){
public FruitDeserializer() {
// pass the class to the parent.
super(Fruit.class);
}
}
----

NOTE: If you don't want to create a deserializer for each of your pojo, you can use the generic `io.vertx.kafka.client.serialization.JsonObjectDeserializer`
that will deserialize to a `javax.json.JsonObject`. The corresponding serializer can also be used: `io.vertx.kafka.client.serialization.JsonObjectSerializer`.
that will deserialize to a `io.vertx.core.json.JsonObject`. The corresponding serializer can also be used: `io.vertx.kafka.client.serialization.JsonObjectSerializer`.

Finally, configure your streams to use the JSON-B serializer and deserializer.

Expand Down Expand Up @@ -629,6 +633,76 @@ public class FruitResource {
}
----

== Avro serialization

This is described in a dedicated guide: link:kafka-schema-registry-avro.adoc[Using Apache Kafka with Schema Registry and Avro].

[[serialization-autodetection]]
== Serializer/deserializer autodetection

When using SmallRye Reactive Messaging with Kafka, Quarkus can often automatically detect the correct serializer and deserializer class.
This autodetection is based on declarations of `@Incoming` and `@Outgoing` methods, as well as injected ``@Channel``s.

For example, if you declare

[source,java]
----
@Outgoing("generated-price")
public Multi<Integer> generate() {
...
}
----

and your configuration indicates that the `generated-price` channel uses the `smallrye-kafka` connector, then Quarkus will automatically set the `value.serializer` to Kafka's built-in `IntegerSerializer`.

Similarly, if you declare

[source,java]
----
@Incoming("my-kafka-records")
public void consume(KafkaRecord<Long, byte[]> record) {
...
}
----

and your configuration indicates that the `my-kafka-records` channel uses the `smallrye-kafka` connector, then Quarkus will automatically set the `key.deserializer` to Kafka's built-in `LongDeserializer`, as well as the `value.deserializer` to `ByteArrayDeserializer`.

Finally, if you declare

[source,java]
----
@Inject
@Channel("price-create")
Emitter<Double> priceEmitter;
----

and your configuration indicates that the `price-create` channel uses the `smallrye-kafka` connector, then Quarkus will automatically set the `value.serializer` to Kafka's built-in `DoubleSerializer`.

The full set of types supported by the serializer/deserializer autodetection is:

* `short` and `java.lang.Short`
* `int` and `java.lang.Integer`
* `long` and `java.lang.Long`
* `float` and `java.lang.Float`
* `double` and `java.lang.Double`
* `byte[]`
* `java.lang.String`
* `java.util.UUID`
* `java.nio.ByteBuffer`
* `org.apache.kafka.common.utils.Bytes`
* `io.vertx.core.buffer.Buffer`
* `io.vertx.core.json.JsonObject`
* `io.vertx.core.json.JsonArray`
* classes generated from Avro schemata, if Confluent or Apicurio _serde_ is present
** see link:kafka-schema-registry-avro.adoc[Using Apache Kafka with Schema Registry and Avro] for more information about using Confluent or Apicurio libraries
* classes for which a subclass of `ObjectMapperSerializer` / `ObjectMapperDeserializer` is present, as described in <<jackson-serialization>>
** it is technically not needed to subclass `ObjectMapperSerializer`, but in such case, autodetection isn't possible
* classes for which a subclass of `JsonbSerializer` / `JsonbDeserializer` is present, as described in <<jsonb-serialization>>
** it is technically not needed to subclass `JsonbSerializer`, but in such case, autodetection isn't possible

In case you have any issues with serializer autodetection, you can switch it off completely by setting `quarkus.reactive-messaging.kafka.serializer-autodetection.enabled=false`.
If you find you need to do this, please file a bug in the link:https://github.com/quarkusio/quarkus/issues[Quarkus issue tracker] so we can fix whatever problem you have.

== Blocking processing

You often need to combine Reactive Messaging with blocking processing such as database interactions.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,6 @@ private void handleAvro(BuildProducer<ReflectiveClassBuildItem> reflectiveClass,
"io.apicurio.registry.serde.DefaultSchemaResolver",
"io.apicurio.registry.serde.DefaultIdHandler",
"io.apicurio.registry.serde.Legacy4ByteIdHandler",
"io.apicurio.registry.serde.DefaultSchemaResolver",
"io.apicurio.registry.serde.fallback.DefaultFallbackArtifactProvider",
"io.apicurio.registry.serde.headers.DefaultHeadersHandler"));

Expand Down
26 changes: 26 additions & 0 deletions extensions/smallrye-reactive-messaging-kafka/deployment/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,32 @@
<artifactId>quarkus-junit5-internal</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-avro-deployment</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-jackson-deployment</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-jsonb-deployment</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-registry-serdes-avro-serde</artifactId>
<version>2.0.0.Final</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
package io.quarkus.smallrye.reactivemessaging.kafka.deployment;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import org.eclipse.microprofile.config.ConfigProvider;
import org.jboss.jandex.AnnotationInstance;
import org.jboss.jandex.AnnotationTarget;
import org.jboss.jandex.ClassInfo;
import org.jboss.jandex.DotName;
import org.jboss.jandex.IndexView;
import org.jboss.jandex.Type;

import io.smallrye.reactive.messaging.kafka.KafkaConnector;

class DefaultSerdeDiscoveryState {
private final IndexView index;

private final Map<String, Boolean> isKafkaConnector = new HashMap<>();

private Boolean hasConfluent;
private Boolean hasApicurio1;
private Boolean hasApicurio2;
private Boolean hasJackson;
private Boolean hasJsonb;

DefaultSerdeDiscoveryState(IndexView index) {
this.index = index;
}

boolean isKafkaConnector(boolean incoming, String channelName) {
String channelType = incoming ? "incoming" : "outgoing";
return isKafkaConnector.computeIfAbsent(channelType + "|" + channelName, ignored -> {
String connectorKey = "mp.messaging." + channelType + "." + channelName + ".connector";
String connector = ConfigProvider.getConfig()
.getOptionalValue(connectorKey, String.class)
.orElse("ignored");
return KafkaConnector.CONNECTOR_NAME.equals(connector);
});
}

boolean isAvroGenerated(DotName className) {
ClassInfo clazz = index.getClassByName(className);
return clazz != null && clazz.classAnnotation(DotNames.AVRO_GENERATED) != null;
}

boolean hasConfluent() {
if (hasConfluent == null) {
try {
Class.forName("io.confluent.kafka.serializers.KafkaAvroDeserializer", false,
Thread.currentThread().getContextClassLoader());
hasConfluent = true;
} catch (ClassNotFoundException e) {
hasConfluent = false;
}
}

return hasConfluent;
}

boolean hasApicurio1() {
if (hasApicurio1 == null) {
try {
Class.forName("io.apicurio.registry.utils.serde.AvroKafkaDeserializer", false,
Thread.currentThread().getContextClassLoader());
hasApicurio1 = true;
} catch (ClassNotFoundException e) {
hasApicurio1 = false;
}
}

return hasApicurio1;
}

boolean hasApicurio2() {
if (hasApicurio2 == null) {
try {
Class.forName("io.apicurio.registry.serde.avro.AvroKafkaDeserializer", false,
Thread.currentThread().getContextClassLoader());
hasApicurio2 = true;
} catch (ClassNotFoundException e) {
hasApicurio2 = false;
}
}

return hasApicurio2;
}

boolean hasJackson() {
if (hasJackson == null) {
try {
Class.forName("com.fasterxml.jackson.databind.ObjectMapper", false,
Thread.currentThread().getContextClassLoader());
hasJackson = true;
} catch (ClassNotFoundException e) {
hasJackson = false;
}
}

return hasJackson;
}

boolean hasJsonb() {
if (hasJsonb == null) {
try {
Class.forName("javax.json.bind.Jsonb", false,
Thread.currentThread().getContextClassLoader());
hasJsonb = true;
} catch (ClassNotFoundException e) {
hasJsonb = false;
}
}

return hasJsonb;
}

ClassInfo getSubclassOfWithTypeArgument(DotName superclass, DotName expectedTypeArgument) {
return index.getKnownDirectSubclasses(superclass)
.stream()
.filter(it -> it.superClassType().kind() == Type.Kind.PARAMETERIZED_TYPE
&& it.superClassType().asParameterizedType().arguments().size() == 1
&& it.superClassType().asParameterizedType().arguments().get(0).name().equals(expectedTypeArgument))
.findAny()
.orElse(null);
}

List<AnnotationInstance> findAnnotationsOnMethods(DotName annotation) {
return index.getAnnotations(annotation)
.stream()
.filter(it -> it.target().kind() == AnnotationTarget.Kind.METHOD)
.collect(Collectors.toList());
}

List<AnnotationInstance> findAnnotationsOnInjectionPoints(DotName annotation) {
return index.getAnnotations(annotation)
.stream()
.filter(it -> it.target().kind() == AnnotationTarget.Kind.FIELD
|| it.target().kind() == AnnotationTarget.Kind.METHOD_PARAMETER)
.collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package io.quarkus.smallrye.reactivemessaging.kafka.deployment;

import org.jboss.jandex.DotName;

final class DotNames {
// @formatter:off
static final DotName INCOMING = DotName.createSimple(org.eclipse.microprofile.reactive.messaging.Incoming.class.getName());
static final DotName OUTGOING = DotName.createSimple(org.eclipse.microprofile.reactive.messaging.Outgoing.class.getName());
static final DotName CHANNEL = DotName.createSimple(org.eclipse.microprofile.reactive.messaging.Channel.class.getName());

static final DotName EMITTER = DotName.createSimple(org.eclipse.microprofile.reactive.messaging.Emitter.class.getName());
static final DotName MUTINY_EMITTER = DotName.createSimple(io.smallrye.reactive.messaging.MutinyEmitter.class.getName());

static final DotName MESSAGE = DotName.createSimple(org.eclipse.microprofile.reactive.messaging.Message.class.getName());
static final DotName KAFKA_RECORD = DotName.createSimple(io.smallrye.reactive.messaging.kafka.KafkaRecord.class.getName());
static final DotName RECORD = DotName.createSimple(io.smallrye.reactive.messaging.kafka.Record.class.getName());

static final DotName COMPLETION_STAGE = DotName.createSimple(java.util.concurrent.CompletionStage.class.getName());
static final DotName UNI = DotName.createSimple(io.smallrye.mutiny.Uni.class.getName());

static final DotName SUBSCRIBER = DotName.createSimple(org.reactivestreams.Subscriber.class.getName());
static final DotName SUBSCRIBER_BUILDER = DotName.createSimple(org.eclipse.microprofile.reactive.streams.operators.SubscriberBuilder.class.getName());
static final DotName PUBLISHER = DotName.createSimple(org.reactivestreams.Publisher.class.getName());
static final DotName PUBLISHER_BUILDER = DotName.createSimple(org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder.class.getName());
static final DotName PROCESSOR = DotName.createSimple(org.reactivestreams.Processor.class.getName());
static final DotName PROCESSOR_BUILDER = DotName.createSimple(org.eclipse.microprofile.reactive.streams.operators.ProcessorBuilder.class.getName());
static final DotName MULTI = DotName.createSimple(io.smallrye.mutiny.Multi.class.getName());

static final DotName AVRO_GENERATED = DotName.createSimple("org.apache.avro.specific.AvroGenerated");
static final DotName OBJECT_MAPPER_DESERIALIZER = DotName.createSimple(io.quarkus.kafka.client.serialization.ObjectMapperDeserializer.class.getName());
static final DotName OBJECT_MAPPER_SERIALIZER = DotName.createSimple(io.quarkus.kafka.client.serialization.ObjectMapperSerializer.class.getName());
static final DotName JSONB_DESERIALIZER = DotName.createSimple(io.quarkus.kafka.client.serialization.JsonbDeserializer.class.getName());
static final DotName JSONB_SERIALIZER = DotName.createSimple(io.quarkus.kafka.client.serialization.JsonbSerializer.class.getName());
// @formatter:on
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package io.quarkus.smallrye.reactivemessaging.kafka.deployment;

import io.quarkus.runtime.annotations.ConfigItem;
import io.quarkus.runtime.annotations.ConfigPhase;
import io.quarkus.runtime.annotations.ConfigRoot;

@ConfigRoot(name = "reactive-messaging.kafka", phase = ConfigPhase.BUILD_TIME)
public class ReactiveMessagingKafkaBuildTimeConfig {
/**
* Whether or not Kafka serializer/deserializer autodetection is enabled.
*/
@ConfigItem(name = "serializer-autodetection.enabled", defaultValue = "true")
public boolean serializerAutodetectionEnabled;
}
Loading

0 comments on commit 932424b

Please sign in to comment.