Skip to content

Commit

Permalink
Merge pull request quarkusio#19543 from loicmathieu/kafka-serializer-…
Browse files Browse the repository at this point in the history
…null-handling

Kafka serializer null handling
  • Loading branch information
cescoffier authored Aug 22, 2021
2 parents 6d75e13 + 1869932 commit 8445d00
Show file tree
Hide file tree
Showing 8 changed files with 289 additions and 0 deletions.
45 changes: 45 additions & 0 deletions docs/src/main/asciidoc/kafka.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -1116,6 +1116,10 @@ First, you need to include the `quarkus-jackson` extension.
There is an existing `ObjectMapperSerializer` that can be used to serialize all data objects via Jackson.
You may create an empty subclass if you want to use <<serialization-autodetection>>.

NOTE: By default, the `ObjectMapperSerializer` serializes null as the `"null"` String, this can be customized by setting the Kafka configuration
property `json.serialize.null-as-null=true` which will serialize null as `null`.
This is handy when using a compacted topic, as `null` is used as a tombstone to know which messages delete during compaction phase.

The corresponding deserializer class needs to be subclassed.
So, let's create a `FruitDeserializer` that extends the `ObjectMapperDeserializer`.

Expand Down Expand Up @@ -1150,6 +1154,25 @@ mp.messaging.outgoing.fruit-out.value.serializer=io.quarkus.kafka.client.seriali
Now, your Kafka messages will contain a Jackson serialized representation of your `Fruit` data object.
In this case, the `deserializer` configuration is not necessary as the <<serialization-autodetection>> is enabled by default.

If you want to deserialize a list of fruits, you need to create a deserializer with a Jackson `TypeReference` denoted the generic collection used.

[source,java]
----
package com.acme.fruit.jackson;
import java.util.List;
import com.fasterxml.jackson.core.type.TypeReference;
import io.quarkus.kafka.client.serialization.ObjectMapperDeserializer;
public class ListOfFruitDeserializer extends ObjectMapperDeserializer<List<Fruit>> {
public ListOfFruitDeserializer() {
TypeReference<List<Fruit>> listType = new TypeReference<>() {
};
super(listType);
}
}
----

[[jsonb-serialization]]
=== Serializing via JSON-B

Expand All @@ -1166,6 +1189,10 @@ First, you need to include the `quarkus-jsonb` extension.
There is an existing `JsonbSerializer` that can be used to serialize all data objects via JSON-B.
You may create an empty subclass if you want to use <<serialization-autodetection>>.

NOTE: By default, the `JsonbSerializer` serializes null as the `"null"` String, this can be customized by setting the Kafka configuration
property `json.serialize.null-as-null=true` which will serialize null as `null`.
This is handy when using a compacted topic, as `null` is used as a tombstone to know which messages delete during compaction phase.

The corresponding deserializer class needs to be subclassed.
So, let's create a `FruitDeserializer` that extends the generic `JsonbDeserializer`.

Expand Down Expand Up @@ -1199,6 +1226,24 @@ mp.messaging.outgoing.fruit-out.value.serializer=io.quarkus.kafka.client.seriali

Now, your Kafka messages will contain a JSON-B serialized representation of your `Fruit` data object.

If you want to deserialize a list of fruits, you need to create a deserializer with a `Type` denoted the generic collection used.

[source,java]
----
package com.acme.fruit.jsonb;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.List;
import io.quarkus.kafka.client.serialization.JsonbDeserializer;
public class ListOfFruitDeserializer extends JsonbDeserializer<List<Fruit>> {
public ListOfFruitDeserializer() {
Type listType = new ArrayList<MyEntity>() {}.getClass().getGenericSuperclass();
super(listType);
}
}
----

NOTE: If you don't want to create a deserializer for each data object, you can use the generic `io.vertx.kafka.client.serialization.JsonObjectDeserializer`
that will deserialize to a `io.vertx.core.json.JsonObject`. The corresponding serializer can also be used: `io.vertx.kafka.client.serialization.JsonObjectSerializer`.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,13 @@
* A {@link Serializer} that serializes to JSON using JSON-B.
*/
public class JsonbSerializer<T> implements Serializer<T> {
public static final String NULL_AS_NULL_CONFIG = "json.serialize.null-as-null";

private final Jsonb jsonb;
private final boolean jsonbNeedsClosing;

private boolean nullAsNull = false;

public JsonbSerializer() {
this(JsonbProducer.get(), true);
}
Expand All @@ -31,10 +34,17 @@ private JsonbSerializer(Jsonb jsonb, boolean jsonbNeedsClosing) {

@Override
public void configure(Map<String, ?> configs, boolean isKey) {
if (configs.containsKey(NULL_AS_NULL_CONFIG) && Boolean.parseBoolean((String) configs.get(NULL_AS_NULL_CONFIG))) {
nullAsNull = true;
}
}

@Override
public byte[] serialize(String topic, T data) {
if (nullAsNull && data == null) {
return null;
}

try (ByteArrayOutputStream output = new ByteArrayOutputStream()) {
jsonb.toJson(data, output);
return output.toByteArray();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,12 @@
* A {@link Deserializer} that deserializes JSON using Jackson's ObjectMapper.
*/
public class ObjectMapperSerializer<T> implements Serializer<T> {
public static final String NULL_AS_NULL_CONFIG = "json.serialize.null-as-null";

private final ObjectMapper objectMapper;

private boolean nullAsNull = false;

public ObjectMapperSerializer() {
this(ObjectMapperProducer.get());
}
Expand All @@ -26,10 +29,17 @@ public ObjectMapperSerializer(ObjectMapper objectMapper) {

@Override
public void configure(Map<String, ?> configs, boolean isKey) {
if (configs.containsKey(NULL_AS_NULL_CONFIG) && Boolean.parseBoolean((String) configs.get(NULL_AS_NULL_CONFIG))) {
nullAsNull = true;
}
}

@Override
public byte[] serialize(String topic, T data) {
if (nullAsNull && data == null) {
return null;
}

try (ByteArrayOutputStream output = new ByteArrayOutputStream()) {
objectMapper.writeValue(output, data);
return output.toByteArray();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package io.quarkus.kafka.client.serialization;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;

import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.List;

import org.junit.jupiter.api.Test;

class JsonbDeserializerTest {
@Test
void shouldDeserializeEntity() {
MyEntity expected = new MyEntity(1, "entity1");
JsonbDeserializer<MyEntity> deserializer = new JsonbDeserializer<>(MyEntity.class);
MyEntity actual = deserializer.deserialize("topic", "{\"id\":1,\"name\":\"entity1\"}".getBytes());
assertNotNull(actual);
assertEquals(expected, actual);
}

@Test
void shouldDeserializeListOfEntities() {
Type listType = new ArrayList<MyEntity>() {
}.getClass().getGenericSuperclass();
JsonbDeserializer<List<MyEntity>> deserializer = new JsonbDeserializer<>(listType);
List<MyEntity> actuals = deserializer.deserialize("topic",
"[{\"id\":1,\"name\":\"entity1\"},{\"id\":2,\"name\":\"entity2\"}]".getBytes());
assertNotNull(actuals);
assertEquals(2, actuals.size());
}

@Test
void shouldDeserializeNullAsNullString() {
JsonbDeserializer<MyEntity> deserializer = new JsonbDeserializer<>(MyEntity.class);
MyEntity results = deserializer.deserialize("topic", "null".getBytes());
assertNull(results);
}

@Test
void shouldDeserializeNullAsNull() {
JsonbDeserializer<MyEntity> deserializer = new JsonbDeserializer<>(MyEntity.class);
MyEntity results = deserializer.deserialize("topic", null);
assertNull(results);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package io.quarkus.kafka.client.serialization;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;

import java.util.List;
import java.util.Map;

import org.junit.jupiter.api.Test;

class JsonbSerializerTest {

@Test
void shouldSerializeEntity() {
JsonbSerializer<MyEntity> serializer = new JsonbSerializer<>();
MyEntity entity = new MyEntity(1, "entity1");
byte[] result = serializer.serialize("topic", entity);
assertNotNull(result);
assertEquals("{\"id\":1,\"name\":\"entity1\"}", new String(result));
}

@Test
void shouldSerializeListOfEntities() {
JsonbSerializer<List<MyEntity>> serializer = new JsonbSerializer<>();
MyEntity entity1 = new MyEntity(1, "entity1");
MyEntity entity2 = new MyEntity(2, "entity2");
byte[] result = serializer.serialize("topic", List.of(entity1, entity2));
assertNotNull(result);
assertEquals("[{\"id\":1,\"name\":\"entity1\"},{\"id\":2,\"name\":\"entity2\"}]", new String(result));
}

@Test
void shouldSerializeNullAsNullString() {
JsonbSerializer<MyEntity> serializer = new JsonbSerializer<>();
byte[] results = serializer.serialize("topic", null);
assertNotNull(results);
assertEquals("null", new String(results));
}

@Test
void shouldSerializeNullAsNull() {
JsonbSerializer<MyEntity> serializer = new JsonbSerializer<>();
serializer.configure(Map.of(JsonbSerializer.NULL_AS_NULL_CONFIG, "true"), false);
byte[] results = serializer.serialize("topic", null);
assertNull(results);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package io.quarkus.kafka.client.serialization;

import java.util.Objects;

public class MyEntity {
public long id;
public String name;

// used by deserializers
public MyEntity() {
}

public MyEntity(long id, String name) {
this.id = id;
this.name = name;
}

@Override
public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
MyEntity myEntity = (MyEntity) o;
return id == myEntity.id && Objects.equals(name, myEntity.name);
}

@Override
public int hashCode() {
return Objects.hash(id, name);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package io.quarkus.kafka.client.serialization;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;

import java.util.List;

import org.junit.jupiter.api.Test;

import com.fasterxml.jackson.core.type.TypeReference;

class ObjectMapperDeserializerTest {
@Test
void shouldDeserializeEntity() {
MyEntity expected = new MyEntity(1, "entity1");
ObjectMapperDeserializer<MyEntity> deserializer = new ObjectMapperDeserializer<>(MyEntity.class);
MyEntity actual = deserializer.deserialize("topic", "{\"id\":1,\"name\":\"entity1\"}".getBytes());
assertNotNull(actual);
assertEquals(expected, actual);
}

@Test
void shouldDeserializeListOfEntities() {
TypeReference<List<MyEntity>> listType = new TypeReference<>() {
};
ObjectMapperDeserializer<List<MyEntity>> deserializer = new ObjectMapperDeserializer<>(listType);
List<MyEntity> actuals = deserializer.deserialize("topic",
"[{\"id\":1,\"name\":\"entity1\"},{\"id\":2,\"name\":\"entity2\"}]".getBytes());
assertNotNull(actuals);
assertEquals(2, actuals.size());
}

@Test
void shouldDeserializeNullAsNullString() {
ObjectMapperDeserializer<MyEntity> deserializer = new ObjectMapperDeserializer<>(MyEntity.class);
MyEntity results = deserializer.deserialize("topic", "null".getBytes());
assertNull(results);
}

@Test
void shouldDeserializeNullAsNull() {
ObjectMapperDeserializer<MyEntity> deserializer = new ObjectMapperDeserializer<>(MyEntity.class);
MyEntity results = deserializer.deserialize("topic", null);
assertNull(results);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package io.quarkus.kafka.client.serialization;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;

import java.util.List;
import java.util.Map;

import org.junit.jupiter.api.Test;

class ObjectMapperSerializerTest {

@Test
void shouldSerializeEntity() {
ObjectMapperSerializer<MyEntity> serializer = new ObjectMapperSerializer<>();
MyEntity entity = new MyEntity(1, "entity1");
byte[] result = serializer.serialize("topic", entity);
assertNotNull(result);
assertEquals("{\"id\":1,\"name\":\"entity1\"}", new String(result));
}

@Test
void shouldSerializeListOfEntities() {
ObjectMapperSerializer<List<MyEntity>> serializer = new ObjectMapperSerializer<>();
MyEntity entity1 = new MyEntity(1, "entity1");
MyEntity entity2 = new MyEntity(2, "entity2");
byte[] result = serializer.serialize("topic", List.of(entity1, entity2));
assertNotNull(result);
assertEquals("[{\"id\":1,\"name\":\"entity1\"},{\"id\":2,\"name\":\"entity2\"}]", new String(result));
}

@Test
void shouldSerializeNullAsNullString() {
ObjectMapperSerializer<MyEntity> serializer = new ObjectMapperSerializer<>();
byte[] results = serializer.serialize("topic", null);
assertNotNull(results);
assertEquals("null", new String(results));
}

@Test
void shouldSerializeNullAsNull() {
ObjectMapperSerializer<MyEntity> serializer = new ObjectMapperSerializer<>();
serializer.configure(Map.of(ObjectMapperSerializer.NULL_AS_NULL_CONFIG, "true"), false);
byte[] results = serializer.serialize("topic", null);
assertNull(results);
}

}

0 comments on commit 8445d00

Please sign in to comment.