diff --git a/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/serialization/BufferDeserializer.java b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/serialization/BufferDeserializer.java new file mode 100644 index 0000000000000..d2d36cb168ee2 --- /dev/null +++ b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/serialization/BufferDeserializer.java @@ -0,0 +1,20 @@ +package io.quarkus.kafka.client.serialization; + +import org.apache.kafka.common.serialization.Deserializer; + +import io.vertx.core.buffer.Buffer; + +/** + * Kafka deserializer for raw bytes in a buffer + */ +public class BufferDeserializer implements Deserializer { + + @Override + public Buffer deserialize(String topic, byte[] data) { + if (data == null) + return null; + + return Buffer.buffer(data); + } + +} diff --git a/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/serialization/BufferSerde.java b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/serialization/BufferSerde.java new file mode 100644 index 0000000000000..1700f875d04dc --- /dev/null +++ b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/serialization/BufferSerde.java @@ -0,0 +1,13 @@ +package io.quarkus.kafka.client.serialization; + +import org.apache.kafka.common.serialization.Serdes; + +import io.vertx.core.buffer.Buffer; + +public final class BufferSerde extends Serdes.WrapperSerde { + + public BufferSerde() { + super(new BufferSerializer(), new BufferDeserializer()); + } + +} diff --git a/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/serialization/BufferSerializer.java b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/serialization/BufferSerializer.java new file mode 100644 index 0000000000000..1db46365d3bc4 --- /dev/null +++ b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/serialization/BufferSerializer.java @@ -0,0 +1,19 @@ +package io.quarkus.kafka.client.serialization; + +import org.apache.kafka.common.serialization.Serializer; + +import io.vertx.core.buffer.Buffer; + +/** + * Kafka serializer for raw bytes in a buffer + */ +public class BufferSerializer implements Serializer { + + @Override + public byte[] serialize(String topic, Buffer data) { + if (data == null) + return null; + + return data.getBytes(); + } +} diff --git a/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/serialization/JsonArrayDeserializer.java b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/serialization/JsonArrayDeserializer.java new file mode 100644 index 0000000000000..70ec5ca5b7702 --- /dev/null +++ b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/serialization/JsonArrayDeserializer.java @@ -0,0 +1,21 @@ +package io.quarkus.kafka.client.serialization; + +import org.apache.kafka.common.serialization.Deserializer; + +import io.vertx.core.buffer.Buffer; +import io.vertx.core.json.JsonArray; + +/** + * Kafka deserializer for raw bytes in a buffer + */ +public class JsonArrayDeserializer implements Deserializer { + + @Override + public JsonArray deserialize(String topic, byte[] data) { + if (data == null) + return null; + + return Buffer.buffer(data).toJsonArray(); + } + +} diff --git a/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/serialization/JsonArraySerde.java b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/serialization/JsonArraySerde.java new file mode 100644 index 0000000000000..66a56d06eec8d --- /dev/null +++ b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/serialization/JsonArraySerde.java @@ -0,0 +1,11 @@ +package io.quarkus.kafka.client.serialization; + +import org.apache.kafka.common.serialization.Serdes; + +import io.vertx.core.json.JsonArray; + +public final class JsonArraySerde extends Serdes.WrapperSerde { + public JsonArraySerde() { + super(new JsonArraySerializer(), new JsonArrayDeserializer()); + } +} diff --git a/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/serialization/JsonArraySerializer.java b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/serialization/JsonArraySerializer.java new file mode 100644 index 0000000000000..499d9fc845a9f --- /dev/null +++ b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/serialization/JsonArraySerializer.java @@ -0,0 +1,20 @@ +package io.quarkus.kafka.client.serialization; + +import org.apache.kafka.common.serialization.Serializer; + +import io.vertx.core.json.JsonArray; + +/** + * Kafka serializer for raw bytes in a buffer + */ +public class JsonArraySerializer implements Serializer { + + @Override + public byte[] serialize(String topic, JsonArray data) { + if (data == null) + return null; + + return data.encode().getBytes(); + } + +} diff --git a/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/serialization/JsonObjectDeserializer.java b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/serialization/JsonObjectDeserializer.java new file mode 100644 index 0000000000000..a5091bfe147b9 --- /dev/null +++ b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/serialization/JsonObjectDeserializer.java @@ -0,0 +1,21 @@ +package io.quarkus.kafka.client.serialization; + +import org.apache.kafka.common.serialization.Deserializer; + +import io.vertx.core.buffer.Buffer; +import io.vertx.core.json.JsonObject; + +/** + * Kafka deserializer for raw bytes in a buffer + */ +public class JsonObjectDeserializer implements Deserializer { + + @Override + public JsonObject deserialize(String topic, byte[] data) { + if (data == null) + return null; + + return Buffer.buffer(data).toJsonObject(); + } + +} diff --git a/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/serialization/JsonObjectSerde.java b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/serialization/JsonObjectSerde.java new file mode 100644 index 0000000000000..106419a828b2c --- /dev/null +++ b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/serialization/JsonObjectSerde.java @@ -0,0 +1,11 @@ +package io.quarkus.kafka.client.serialization; + +import org.apache.kafka.common.serialization.Serdes; + +import io.vertx.core.json.JsonObject; + +public final class JsonObjectSerde extends Serdes.WrapperSerde { + public JsonObjectSerde() { + super(new JsonObjectSerializer(), new JsonObjectDeserializer()); + } +} diff --git a/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/serialization/JsonObjectSerializer.java b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/serialization/JsonObjectSerializer.java new file mode 100644 index 0000000000000..3bf266cc27b5a --- /dev/null +++ b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/serialization/JsonObjectSerializer.java @@ -0,0 +1,20 @@ +package io.quarkus.kafka.client.serialization; + +import org.apache.kafka.common.serialization.Serializer; + +import io.vertx.core.json.JsonObject; + +/** + * Kafka serializer for raw bytes in a buffer + */ +public class JsonObjectSerializer implements Serializer { + + @Override + public byte[] serialize(String topic, JsonObject data) { + if (data == null) + return null; + + return data.encode().getBytes(); + } + +} diff --git a/extensions/kafka-client/runtime/src/test/java/io/quarkus/kafka/client/serialization/BufferDeserializerTest.java b/extensions/kafka-client/runtime/src/test/java/io/quarkus/kafka/client/serialization/BufferDeserializerTest.java new file mode 100644 index 0000000000000..8abf4f923dd71 --- /dev/null +++ b/extensions/kafka-client/runtime/src/test/java/io/quarkus/kafka/client/serialization/BufferDeserializerTest.java @@ -0,0 +1,32 @@ +package io.quarkus.kafka.client.serialization; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; + +import org.junit.jupiter.api.Test; + +import io.vertx.core.buffer.Buffer; + +public class BufferDeserializerTest { + + @Test + void shouldDeserializeEntity() { + BufferDeserializer deserializer = new BufferDeserializer(); + Buffer actual = deserializer.deserialize("topic", "some-bytes".getBytes()); + assertEquals(Buffer.buffer("some-bytes"), actual); + } + + @Test + void shouldDeserializeNullAsNullString() { + BufferDeserializer deserializer = new BufferDeserializer(); + Buffer actual = deserializer.deserialize("topic", "null".getBytes()); + assertEquals(Buffer.buffer("null"), actual); + } + + @Test + void shouldDeserializeNullAsNull() { + BufferDeserializer deserializer = new BufferDeserializer(); + Buffer actual = deserializer.deserialize("topic", null); + assertNull(actual); + } +} diff --git a/extensions/kafka-client/runtime/src/test/java/io/quarkus/kafka/client/serialization/BufferSerializerTest.java b/extensions/kafka-client/runtime/src/test/java/io/quarkus/kafka/client/serialization/BufferSerializerTest.java new file mode 100644 index 0000000000000..77315345bf787 --- /dev/null +++ b/extensions/kafka-client/runtime/src/test/java/io/quarkus/kafka/client/serialization/BufferSerializerTest.java @@ -0,0 +1,24 @@ +package io.quarkus.kafka.client.serialization; + +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; + +import org.junit.jupiter.api.Test; + +import io.vertx.core.buffer.Buffer; + +public class BufferSerializerTest { + @Test + void shouldSerializeEntity() { + BufferSerializer serializer = new BufferSerializer(); + byte[] actual = serializer.serialize("topic", Buffer.buffer("some-bytes")); + assertNotNull(actual); + } + + @Test + void shouldSerializeNullAsNull() { + BufferSerializer serializer = new BufferSerializer(); + byte[] result = serializer.serialize("topic", null); + assertNull(result); + } +} diff --git a/extensions/kafka-client/runtime/src/test/java/io/quarkus/kafka/client/serialization/JsonArrayDeserializerTest.java b/extensions/kafka-client/runtime/src/test/java/io/quarkus/kafka/client/serialization/JsonArrayDeserializerTest.java new file mode 100644 index 0000000000000..cdb0a1d367005 --- /dev/null +++ b/extensions/kafka-client/runtime/src/test/java/io/quarkus/kafka/client/serialization/JsonArrayDeserializerTest.java @@ -0,0 +1,44 @@ +package io.quarkus.kafka.client.serialization; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import java.util.List; +import java.util.Map; + +import org.junit.jupiter.api.Test; + +import io.vertx.core.json.DecodeException; +import io.vertx.core.json.JsonArray; + +public class JsonArrayDeserializerTest { + @Test + void shouldDeserializeEntity() { + JsonArray expected = new JsonArray(List.of( + Map.of("id", 1, "name", "entity1"), + Map.of("id", 2, "name", "entity2"))); + JsonArrayDeserializer deserializer = new JsonArrayDeserializer(); + String actualString = "[" + + "{\"id\":1,\"name\":\"entity1\"}," + + "{\"id\":2,\"name\":\"entity2\"}" + + "]"; + JsonArray actual = deserializer.deserialize("topic", actualString.getBytes()); + assertNotNull(actual); + assertEquals(expected, actual); + } + + @Test + void shouldThrowDecodeExceptionOnDeserializeNull() { + JsonArrayDeserializer deserializer = new JsonArrayDeserializer(); + assertThrows(DecodeException.class, () -> deserializer.deserialize("topic", "null".getBytes())); + } + + @Test + void shouldDeserializeNullAsNull() { + JsonArrayDeserializer deserializer = new JsonArrayDeserializer(); + JsonArray actual = deserializer.deserialize("topic", null); + assertNull(actual); + } +} diff --git a/extensions/kafka-client/runtime/src/test/java/io/quarkus/kafka/client/serialization/JsonArraySerializerTest.java b/extensions/kafka-client/runtime/src/test/java/io/quarkus/kafka/client/serialization/JsonArraySerializerTest.java new file mode 100644 index 0000000000000..9a9b7afedeffc --- /dev/null +++ b/extensions/kafka-client/runtime/src/test/java/io/quarkus/kafka/client/serialization/JsonArraySerializerTest.java @@ -0,0 +1,35 @@ +package io.quarkus.kafka.client.serialization; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; + +import java.util.List; +import java.util.Map; + +import org.junit.jupiter.api.Test; + +import io.vertx.core.json.JsonArray; + +public class JsonArraySerializerTest { + + @Test + void shouldSerializeEntity() { + JsonArraySerializer serializer = new JsonArraySerializer(); + byte[] result = serializer.serialize("topic", new JsonArray(List.of( + Map.of("id", 1, "name", "entity1"), + Map.of("id", 2, "name", "entity2")))); + assertNotNull(result); + String actual = new String(result); + assertThat(actual) + .contains("\"id\"").contains("\"name\"").contains("\"entity1\"").contains("1") + .contains("\"entity2\"").contains("2"); + } + + @Test + void shouldSerializeNullAsNull() { + JsonArraySerializer serializer = new JsonArraySerializer(); + byte[] result = serializer.serialize("topic", null); + assertNull(result); + } +} diff --git a/extensions/kafka-client/runtime/src/test/java/io/quarkus/kafka/client/serialization/JsonObjectDeserializerTest.java b/extensions/kafka-client/runtime/src/test/java/io/quarkus/kafka/client/serialization/JsonObjectDeserializerTest.java new file mode 100644 index 0000000000000..e0086c948abed --- /dev/null +++ b/extensions/kafka-client/runtime/src/test/java/io/quarkus/kafka/client/serialization/JsonObjectDeserializerTest.java @@ -0,0 +1,38 @@ +package io.quarkus.kafka.client.serialization; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import java.util.Map; + +import org.junit.jupiter.api.Test; + +import io.vertx.core.json.DecodeException; +import io.vertx.core.json.JsonObject; + +public class JsonObjectDeserializerTest { + + @Test + void shouldDeserializeEntity() { + JsonObject expected = new JsonObject(Map.of("id", 1, "name", "entity1")); + JsonObjectDeserializer deserializer = new JsonObjectDeserializer(); + JsonObject actual = deserializer.deserialize("topic", "{\"id\":1,\"name\":\"entity1\"}".getBytes()); + assertNotNull(actual); + assertEquals(expected, actual); + } + + @Test + void shouldThrowDecodeExceptionOnDeserializeNull() { + JsonObjectDeserializer deserializer = new JsonObjectDeserializer(); + assertThrows(DecodeException.class, () -> deserializer.deserialize("topic", "null".getBytes())); + } + + @Test + void shouldDeserializeNullAsNull() { + JsonObjectDeserializer deserializer = new JsonObjectDeserializer(); + JsonObject actual = deserializer.deserialize("topic", null); + assertNull(actual); + } +} diff --git a/extensions/kafka-client/runtime/src/test/java/io/quarkus/kafka/client/serialization/JsonObjectSerializerTest.java b/extensions/kafka-client/runtime/src/test/java/io/quarkus/kafka/client/serialization/JsonObjectSerializerTest.java new file mode 100644 index 0000000000000..8e3a430bd250c --- /dev/null +++ b/extensions/kafka-client/runtime/src/test/java/io/quarkus/kafka/client/serialization/JsonObjectSerializerTest.java @@ -0,0 +1,30 @@ +package io.quarkus.kafka.client.serialization; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; + +import java.util.Map; + +import org.junit.jupiter.api.Test; + +import io.vertx.core.json.JsonObject; + +public class JsonObjectSerializerTest { + + @Test + void shouldSerializeJsonObject() { + JsonObjectSerializer serializer = new JsonObjectSerializer(); + byte[] result = serializer.serialize("topic", new JsonObject(Map.of("id", 1, "name", "entity1"))); + assertNotNull(result); + String actual = new String(result); + assertThat(actual).contains("\"id\"").contains("\"name\"").contains("\"entity1\"").contains("1"); + } + + @Test + void shouldSerializeNullAsNull() { + JsonObjectSerializer serializer = new JsonObjectSerializer(); + byte[] result = serializer.serialize("topic", null); + assertNull(result); + } +} diff --git a/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/KafkaCodecProcessor.java b/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/KafkaCodecProcessor.java index b109cc4a3da66..b61d0c5a870ff 100644 --- a/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/KafkaCodecProcessor.java +++ b/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/KafkaCodecProcessor.java @@ -1,8 +1,15 @@ package io.quarkus.smallrye.reactivemessaging.kafka.deployment; +import org.objectweb.asm.ClassVisitor; +import org.objectweb.asm.MethodVisitor; +import org.objectweb.asm.Opcodes; + import io.quarkus.deployment.annotations.BuildProducer; import io.quarkus.deployment.annotations.BuildStep; +import io.quarkus.deployment.builditem.BytecodeTransformerBuildItem; import io.quarkus.deployment.builditem.nativeimage.ReflectiveClassBuildItem; +import io.quarkus.gizmo.Gizmo; +import io.quarkus.smallrye.reactivemessaging.kafka.KafkaCodecDependencyRemovalLogger; import io.vertx.kafka.client.serialization.BufferDeserializer; import io.vertx.kafka.client.serialization.BufferSerializer; import io.vertx.kafka.client.serialization.JsonArrayDeserializer; @@ -28,4 +35,41 @@ public void build(BuildProducer reflectiveClass) { reflectiveClass.produce(new ReflectiveClassBuildItem(true, false, s.getName())); } } + + @BuildStep + public void deprecateVertxProvidedSerde(BuildProducer producer) { + for (Class vertxSerdeClass : BUILT_INS) { + producer.produce(new BytecodeTransformerBuildItem(vertxSerdeClass.getName(), + KafkaCodecDeprecateClassVisitor::new)); + } + } + + private static class KafkaCodecDeprecateClassVisitor extends ClassVisitor { + + private final String fqcn; + + protected KafkaCodecDeprecateClassVisitor(String fqcn, ClassVisitor classVisitor) { + super(Gizmo.ASM_API_VERSION, classVisitor); + this.fqcn = fqcn; + } + + @Override + public MethodVisitor visitMethod(int access, String name, String descriptor, String signature, String[] exceptions) { + MethodVisitor methodVisitor = super.visitMethod(access, name, descriptor, signature, exceptions); + if (name.equals("")) { + return new MethodVisitor(Gizmo.ASM_API_VERSION, methodVisitor) { + @Override + public void visitCode() { + super.visitCode(); + visitLdcInsn(fqcn); // load fqcn as constant + visitMethodInsn(Opcodes.INVOKESTATIC, + KafkaCodecDependencyRemovalLogger.class.getName().replace(".", "/"), "logRemovedDependency", + "(Ljava/lang/String;)V", false); + } + }; + } else { + return methodVisitor; + } + } + } } diff --git a/extensions/smallrye-reactive-messaging-kafka/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/VertxSerdeRemovedTest.java b/extensions/smallrye-reactive-messaging-kafka/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/VertxSerdeRemovedTest.java new file mode 100644 index 0000000000000..2b04458ad040f --- /dev/null +++ b/extensions/smallrye-reactive-messaging-kafka/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/VertxSerdeRemovedTest.java @@ -0,0 +1,36 @@ +package io.quarkus.smallrye.reactivemessaging.kafka.deployment; + +import static org.assertj.core.api.Assertions.assertThat; + +import javax.enterprise.context.ApplicationScoped; + +import org.eclipse.microprofile.reactive.messaging.Incoming; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.test.QuarkusUnitTest; +import io.vertx.core.json.JsonObject; + +public class VertxSerdeRemovedTest { + + @RegisterExtension + static final QuarkusUnitTest TEST = new QuarkusUnitTest() + .withApplicationRoot(jar -> jar.addClass(IncomingChannel.class)); + + @ApplicationScoped + public static class IncomingChannel { + + @Incoming("in") + public void consume(JsonObject payload) { + // no-op + } + } + + @Test + void test_logged_depreciation_notice() { + TEST.assertLogRecords(records -> { + assertThat(records).anyMatch(log -> log.getLoggerName().contains("KafkaCodecDependencyRemovalLogger") && + log.getMessage().contains("io.quarkus.kafka.client.serialization.JsonObjectDeserializer")); + }); + } +} diff --git a/extensions/smallrye-reactive-messaging-kafka/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/KafkaCodecDependencyRemovalLogger.java b/extensions/smallrye-reactive-messaging-kafka/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/KafkaCodecDependencyRemovalLogger.java new file mode 100644 index 0000000000000..56c6374beff1e --- /dev/null +++ b/extensions/smallrye-reactive-messaging-kafka/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/KafkaCodecDependencyRemovalLogger.java @@ -0,0 +1,20 @@ +package io.quarkus.smallrye.reactivemessaging.kafka; + +import org.jboss.logging.Logger; + +public class KafkaCodecDependencyRemovalLogger { + + private static final Logger LOGGER = Logger.getLogger(KafkaCodecDependencyRemovalLogger.class.getName()); + + private static final String TARGET_SERDE_PACKAGE = "io.vertx.kafka.client.serialization"; + private static final String REPLACEMENT_SERDE_PACKAGE = "io.quarkus.kafka.client.serialization"; + + public static void logDependencyRemoval(String deprecatedClassName) { + LOGGER.warnf("Dependency to be removed: The Serde class `%s` will no longer be included " + + "in the classpath of the Smallrye Reactive Messaging Kafka extension. " + + "Consider replacing it's usage with `%s` provided by the Kafka extension " + + "or including the Vert.x Kafka client dependency (io.vertx:vertx-kafka-client) yourself.", + deprecatedClassName, deprecatedClassName.replace(TARGET_SERDE_PACKAGE, REPLACEMENT_SERDE_PACKAGE)); + } + +}