Skip to content

Commit

Permalink
Merge pull request #26751 from ozangunalp/kafka_vertx_client_serde_de…
Browse files Browse the repository at this point in the history
…preciation

Add dependency removal notice for Kafka Serde provided by Vert.x kafka client
  • Loading branch information
cescoffier authored Jul 18, 2022
2 parents c328e8e + 908c144 commit 7519aec
Show file tree
Hide file tree
Showing 19 changed files with 476 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,12 @@
import io.quarkus.kafka.client.runtime.KafkaBindingConverter;
import io.quarkus.kafka.client.runtime.KafkaRecorder;
import io.quarkus.kafka.client.runtime.KafkaRuntimeConfigProducer;
import io.quarkus.kafka.client.serialization.BufferDeserializer;
import io.quarkus.kafka.client.serialization.BufferSerializer;
import io.quarkus.kafka.client.serialization.JsonArrayDeserializer;
import io.quarkus.kafka.client.serialization.JsonArraySerializer;
import io.quarkus.kafka.client.serialization.JsonObjectDeserializer;
import io.quarkus.kafka.client.serialization.JsonObjectSerializer;
import io.quarkus.kafka.client.serialization.JsonbDeserializer;
import io.quarkus.kafka.client.serialization.JsonbSerializer;
import io.quarkus.kafka.client.serialization.ObjectMapperDeserializer;
Expand All @@ -104,6 +110,10 @@ public class KafkaProcessor {
ByteBufferSerializer.class,
StringSerializer.class,
FloatSerializer.class,
// Provided in extension
JsonObjectSerializer.class,
JsonArraySerializer.class,
BufferSerializer.class,

//deserializers
ShortDeserializer.class,
Expand All @@ -114,7 +124,11 @@ public class KafkaProcessor {
IntegerDeserializer.class,
ByteBufferDeserializer.class,
StringDeserializer.class,
FloatDeserializer.class
FloatDeserializer.class,
// Provided in extension
JsonObjectDeserializer.class,
JsonArrayDeserializer.class,
BufferDeserializer.class
};

static final DotName OBJECT_MAPPER = DotName.createSimple("com.fasterxml.jackson.databind.ObjectMapper");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package io.quarkus.kafka.client.serialization;

import org.apache.kafka.common.serialization.Deserializer;

import io.vertx.core.buffer.Buffer;

/**
* Kafka deserializer for raw bytes in a buffer
*/
public class BufferDeserializer implements Deserializer<Buffer> {

@Override
public Buffer deserialize(String topic, byte[] data) {
if (data == null)
return null;

return Buffer.buffer(data);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package io.quarkus.kafka.client.serialization;

import org.apache.kafka.common.serialization.Serdes;

import io.vertx.core.buffer.Buffer;

public final class BufferSerde extends Serdes.WrapperSerde<Buffer> {

public BufferSerde() {
super(new BufferSerializer(), new BufferDeserializer());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package io.quarkus.kafka.client.serialization;

import org.apache.kafka.common.serialization.Serializer;

import io.vertx.core.buffer.Buffer;

/**
* Kafka serializer for raw bytes in a buffer
*/
public class BufferSerializer implements Serializer<Buffer> {

@Override
public byte[] serialize(String topic, Buffer data) {
if (data == null)
return null;

return data.getBytes();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package io.quarkus.kafka.client.serialization;

import org.apache.kafka.common.serialization.Deserializer;

import io.vertx.core.buffer.Buffer;
import io.vertx.core.json.JsonArray;

/**
* Kafka deserializer for raw bytes in a buffer
*/
public class JsonArrayDeserializer implements Deserializer<JsonArray> {

@Override
public JsonArray deserialize(String topic, byte[] data) {
if (data == null)
return null;

return Buffer.buffer(data).toJsonArray();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package io.quarkus.kafka.client.serialization;

import org.apache.kafka.common.serialization.Serdes;

import io.vertx.core.json.JsonArray;

public final class JsonArraySerde extends Serdes.WrapperSerde<JsonArray> {
public JsonArraySerde() {
super(new JsonArraySerializer(), new JsonArrayDeserializer());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package io.quarkus.kafka.client.serialization;

import org.apache.kafka.common.serialization.Serializer;

import io.vertx.core.json.JsonArray;

/**
* Kafka serializer for raw bytes in a buffer
*/
public class JsonArraySerializer implements Serializer<JsonArray> {

@Override
public byte[] serialize(String topic, JsonArray data) {
if (data == null)
return null;

return data.encode().getBytes();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package io.quarkus.kafka.client.serialization;

import org.apache.kafka.common.serialization.Deserializer;

import io.vertx.core.buffer.Buffer;
import io.vertx.core.json.JsonObject;

/**
* Kafka deserializer for raw bytes in a buffer
*/
public class JsonObjectDeserializer implements Deserializer<JsonObject> {

@Override
public JsonObject deserialize(String topic, byte[] data) {
if (data == null)
return null;

return Buffer.buffer(data).toJsonObject();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package io.quarkus.kafka.client.serialization;

import org.apache.kafka.common.serialization.Serdes;

import io.vertx.core.json.JsonObject;

public final class JsonObjectSerde extends Serdes.WrapperSerde<JsonObject> {
public JsonObjectSerde() {
super(new JsonObjectSerializer(), new JsonObjectDeserializer());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package io.quarkus.kafka.client.serialization;

import org.apache.kafka.common.serialization.Serializer;

import io.vertx.core.json.JsonObject;

/**
* Kafka serializer for raw bytes in a buffer
*/
public class JsonObjectSerializer implements Serializer<JsonObject> {

@Override
public byte[] serialize(String topic, JsonObject data) {
if (data == null)
return null;

return data.encode().getBytes();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package io.quarkus.kafka.client.serialization;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;

import org.junit.jupiter.api.Test;

import io.vertx.core.buffer.Buffer;

public class BufferDeserializerTest {

@Test
void shouldDeserializeEntity() {
BufferDeserializer deserializer = new BufferDeserializer();
Buffer actual = deserializer.deserialize("topic", "some-bytes".getBytes());
assertEquals(Buffer.buffer("some-bytes"), actual);
}

@Test
void shouldDeserializeNullAsNullString() {
BufferDeserializer deserializer = new BufferDeserializer();
Buffer actual = deserializer.deserialize("topic", "null".getBytes());
assertEquals(Buffer.buffer("null"), actual);
}

@Test
void shouldDeserializeNullAsNull() {
BufferDeserializer deserializer = new BufferDeserializer();
Buffer actual = deserializer.deserialize("topic", null);
assertNull(actual);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package io.quarkus.kafka.client.serialization;

import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;

import org.junit.jupiter.api.Test;

import io.vertx.core.buffer.Buffer;

public class BufferSerializerTest {
@Test
void shouldSerializeEntity() {
BufferSerializer serializer = new BufferSerializer();
byte[] actual = serializer.serialize("topic", Buffer.buffer("some-bytes"));
assertNotNull(actual);
}

@Test
void shouldSerializeNullAsNull() {
BufferSerializer serializer = new BufferSerializer();
byte[] result = serializer.serialize("topic", null);
assertNull(result);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package io.quarkus.kafka.client.serialization;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;

import java.util.List;
import java.util.Map;

import org.junit.jupiter.api.Test;

import io.vertx.core.json.DecodeException;
import io.vertx.core.json.JsonArray;

public class JsonArrayDeserializerTest {
@Test
void shouldDeserializeEntity() {
JsonArray expected = new JsonArray(List.of(
Map.of("id", 1, "name", "entity1"),
Map.of("id", 2, "name", "entity2")));
JsonArrayDeserializer deserializer = new JsonArrayDeserializer();
String actualString = "[" +
"{\"id\":1,\"name\":\"entity1\"}," +
"{\"id\":2,\"name\":\"entity2\"}" +
"]";
JsonArray actual = deserializer.deserialize("topic", actualString.getBytes());
assertNotNull(actual);
assertEquals(expected, actual);
}

@Test
void shouldThrowDecodeExceptionOnDeserializeNull() {
JsonArrayDeserializer deserializer = new JsonArrayDeserializer();
assertThrows(DecodeException.class, () -> deserializer.deserialize("topic", "null".getBytes()));
}

@Test
void shouldDeserializeNullAsNull() {
JsonArrayDeserializer deserializer = new JsonArrayDeserializer();
JsonArray actual = deserializer.deserialize("topic", null);
assertNull(actual);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package io.quarkus.kafka.client.serialization;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;

import java.util.List;
import java.util.Map;

import org.junit.jupiter.api.Test;

import io.vertx.core.json.JsonArray;

public class JsonArraySerializerTest {

@Test
void shouldSerializeEntity() {
JsonArraySerializer serializer = new JsonArraySerializer();
byte[] result = serializer.serialize("topic", new JsonArray(List.of(
Map.of("id", 1, "name", "entity1"),
Map.of("id", 2, "name", "entity2"))));
assertNotNull(result);
String actual = new String(result);
assertThat(actual)
.contains("\"id\"").contains("\"name\"").contains("\"entity1\"").contains("1")
.contains("\"entity2\"").contains("2");
}

@Test
void shouldSerializeNullAsNull() {
JsonArraySerializer serializer = new JsonArraySerializer();
byte[] result = serializer.serialize("topic", null);
assertNull(result);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package io.quarkus.kafka.client.serialization;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;

import java.util.Map;

import org.junit.jupiter.api.Test;

import io.vertx.core.json.DecodeException;
import io.vertx.core.json.JsonObject;

public class JsonObjectDeserializerTest {

@Test
void shouldDeserializeEntity() {
JsonObject expected = new JsonObject(Map.of("id", 1, "name", "entity1"));
JsonObjectDeserializer deserializer = new JsonObjectDeserializer();
JsonObject actual = deserializer.deserialize("topic", "{\"id\":1,\"name\":\"entity1\"}".getBytes());
assertNotNull(actual);
assertEquals(expected, actual);
}

@Test
void shouldThrowDecodeExceptionOnDeserializeNull() {
JsonObjectDeserializer deserializer = new JsonObjectDeserializer();
assertThrows(DecodeException.class, () -> deserializer.deserialize("topic", "null".getBytes()));
}

@Test
void shouldDeserializeNullAsNull() {
JsonObjectDeserializer deserializer = new JsonObjectDeserializer();
JsonObject actual = deserializer.deserialize("topic", null);
assertNull(actual);
}
}
Loading

0 comments on commit 7519aec

Please sign in to comment.