Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka serializer null handling #19543

Merged
merged 2 commits into from
Aug 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions docs/src/main/asciidoc/kafka.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -1116,6 +1116,10 @@ First, you need to include the `quarkus-jackson` extension.
There is an existing `ObjectMapperSerializer` that can be used to serialize all data objects via Jackson.
You may create an empty subclass if you want to use <<serialization-autodetection>>.

NOTE: By default, the `ObjectMapperSerializer` serializes null as the `"null"` String, this can be customized by setting the Kafka configuration
property `json.serialize.null-as-null=true` which will serialize null as `null`.
This is handy when using a compacted topic, as `null` is used as a tombstone to know which messages delete during compaction phase.

The corresponding deserializer class needs to be subclassed.
So, let's create a `FruitDeserializer` that extends the `ObjectMapperDeserializer`.

Expand Down Expand Up @@ -1150,6 +1154,25 @@ mp.messaging.outgoing.fruit-out.value.serializer=io.quarkus.kafka.client.seriali
Now, your Kafka messages will contain a Jackson serialized representation of your `Fruit` data object.
In this case, the `deserializer` configuration is not necessary as the <<serialization-autodetection>> is enabled by default.

If you want to deserialize a list of fruits, you need to create a deserializer with a Jackson `TypeReference` denoted the generic collection used.

[source,java]
----
package com.acme.fruit.jackson;

import java.util.List;
import com.fasterxml.jackson.core.type.TypeReference;
import io.quarkus.kafka.client.serialization.ObjectMapperDeserializer;

public class ListOfFruitDeserializer extends ObjectMapperDeserializer<List<Fruit>> {
public ListOfFruitDeserializer() {
TypeReference<List<Fruit>> listType = new TypeReference<>() {
};
super(listType);
}
}
----

[[jsonb-serialization]]
=== Serializing via JSON-B

Expand All @@ -1166,6 +1189,10 @@ First, you need to include the `quarkus-jsonb` extension.
There is an existing `JsonbSerializer` that can be used to serialize all data objects via JSON-B.
You may create an empty subclass if you want to use <<serialization-autodetection>>.

NOTE: By default, the `JsonbSerializer` serializes null as the `"null"` String, this can be customized by setting the Kafka configuration
property `json.serialize.null-as-null=true` which will serialize null as `null`.
This is handy when using a compacted topic, as `null` is used as a tombstone to know which messages delete during compaction phase.

The corresponding deserializer class needs to be subclassed.
So, let's create a `FruitDeserializer` that extends the generic `JsonbDeserializer`.

Expand Down Expand Up @@ -1199,6 +1226,24 @@ mp.messaging.outgoing.fruit-out.value.serializer=io.quarkus.kafka.client.seriali

Now, your Kafka messages will contain a JSON-B serialized representation of your `Fruit` data object.

If you want to deserialize a list of fruits, you need to create a deserializer with a `Type` denoted the generic collection used.

[source,java]
----
package com.acme.fruit.jsonb;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.List;
import io.quarkus.kafka.client.serialization.JsonbDeserializer;

public class ListOfFruitDeserializer extends JsonbDeserializer<List<Fruit>> {
public ListOfFruitDeserializer() {
Type listType = new ArrayList<MyEntity>() {}.getClass().getGenericSuperclass();
super(listType);
}
}
----

NOTE: If you don't want to create a deserializer for each data object, you can use the generic `io.vertx.kafka.client.serialization.JsonObjectDeserializer`
that will deserialize to a `io.vertx.core.json.JsonObject`. The corresponding serializer can also be used: `io.vertx.kafka.client.serialization.JsonObjectSerializer`.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,13 @@
* A {@link Serializer} that serializes to JSON using JSON-B.
*/
public class JsonbSerializer<T> implements Serializer<T> {
public static final String NULL_AS_NULL_CONFIG = "json.serialize.null-as-null";

private final Jsonb jsonb;
private final boolean jsonbNeedsClosing;

private boolean nullAsNull = false;

public JsonbSerializer() {
this(JsonbProducer.get(), true);
}
Expand All @@ -31,10 +34,17 @@ private JsonbSerializer(Jsonb jsonb, boolean jsonbNeedsClosing) {

@Override
public void configure(Map<String, ?> configs, boolean isKey) {
if (configs.containsKey(NULL_AS_NULL_CONFIG) && Boolean.parseBoolean((String) configs.get(NULL_AS_NULL_CONFIG))) {
nullAsNull = true;
}
}

@Override
public byte[] serialize(String topic, T data) {
if (nullAsNull && data == null) {
return null;
}

try (ByteArrayOutputStream output = new ByteArrayOutputStream()) {
jsonb.toJson(data, output);
return output.toByteArray();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,12 @@
* A {@link Deserializer} that deserializes JSON using Jackson's ObjectMapper.
*/
public class ObjectMapperSerializer<T> implements Serializer<T> {
public static final String NULL_AS_NULL_CONFIG = "json.serialize.null-as-null";

private final ObjectMapper objectMapper;

private boolean nullAsNull = false;

public ObjectMapperSerializer() {
this(ObjectMapperProducer.get());
}
Expand All @@ -26,10 +29,17 @@ public ObjectMapperSerializer(ObjectMapper objectMapper) {

@Override
public void configure(Map<String, ?> configs, boolean isKey) {
if (configs.containsKey(NULL_AS_NULL_CONFIG) && Boolean.parseBoolean((String) configs.get(NULL_AS_NULL_CONFIG))) {
nullAsNull = true;
}
}

@Override
public byte[] serialize(String topic, T data) {
if (nullAsNull && data == null) {
return null;
}

try (ByteArrayOutputStream output = new ByteArrayOutputStream()) {
objectMapper.writeValue(output, data);
return output.toByteArray();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package io.quarkus.kafka.client.serialization;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;

import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.List;

import org.junit.jupiter.api.Test;

class JsonbDeserializerTest {
@Test
void shouldDeserializeEntity() {
MyEntity expected = new MyEntity(1, "entity1");
JsonbDeserializer<MyEntity> deserializer = new JsonbDeserializer<>(MyEntity.class);
MyEntity actual = deserializer.deserialize("topic", "{\"id\":1,\"name\":\"entity1\"}".getBytes());
assertNotNull(actual);
assertEquals(expected, actual);
}

@Test
void shouldDeserializeListOfEntities() {
Type listType = new ArrayList<MyEntity>() {
}.getClass().getGenericSuperclass();
JsonbDeserializer<List<MyEntity>> deserializer = new JsonbDeserializer<>(listType);
List<MyEntity> actuals = deserializer.deserialize("topic",
"[{\"id\":1,\"name\":\"entity1\"},{\"id\":2,\"name\":\"entity2\"}]".getBytes());
assertNotNull(actuals);
assertEquals(2, actuals.size());
}

@Test
void shouldDeserializeNullAsNullString() {
JsonbDeserializer<MyEntity> deserializer = new JsonbDeserializer<>(MyEntity.class);
MyEntity results = deserializer.deserialize("topic", "null".getBytes());
assertNull(results);
}

@Test
void shouldDeserializeNullAsNull() {
JsonbDeserializer<MyEntity> deserializer = new JsonbDeserializer<>(MyEntity.class);
MyEntity results = deserializer.deserialize("topic", null);
assertNull(results);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package io.quarkus.kafka.client.serialization;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;

import java.util.List;
import java.util.Map;

import org.junit.jupiter.api.Test;

class JsonbSerializerTest {

@Test
void shouldSerializeEntity() {
JsonbSerializer<MyEntity> serializer = new JsonbSerializer<>();
MyEntity entity = new MyEntity(1, "entity1");
byte[] result = serializer.serialize("topic", entity);
assertNotNull(result);
assertEquals("{\"id\":1,\"name\":\"entity1\"}", new String(result));
}

@Test
void shouldSerializeListOfEntities() {
JsonbSerializer<List<MyEntity>> serializer = new JsonbSerializer<>();
MyEntity entity1 = new MyEntity(1, "entity1");
MyEntity entity2 = new MyEntity(2, "entity2");
byte[] result = serializer.serialize("topic", List.of(entity1, entity2));
assertNotNull(result);
assertEquals("[{\"id\":1,\"name\":\"entity1\"},{\"id\":2,\"name\":\"entity2\"}]", new String(result));
}

@Test
void shouldSerializeNullAsNullString() {
JsonbSerializer<MyEntity> serializer = new JsonbSerializer<>();
byte[] results = serializer.serialize("topic", null);
assertNotNull(results);
assertEquals("null", new String(results));
}

@Test
void shouldSerializeNullAsNull() {
JsonbSerializer<MyEntity> serializer = new JsonbSerializer<>();
serializer.configure(Map.of(JsonbSerializer.NULL_AS_NULL_CONFIG, "true"), false);
byte[] results = serializer.serialize("topic", null);
assertNull(results);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package io.quarkus.kafka.client.serialization;

import java.util.Objects;

public class MyEntity {
public long id;
public String name;

// used by deserializers
public MyEntity() {
}

public MyEntity(long id, String name) {
this.id = id;
this.name = name;
}

@Override
public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
MyEntity myEntity = (MyEntity) o;
return id == myEntity.id && Objects.equals(name, myEntity.name);
}

@Override
public int hashCode() {
return Objects.hash(id, name);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package io.quarkus.kafka.client.serialization;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;

import java.util.List;

import org.junit.jupiter.api.Test;

import com.fasterxml.jackson.core.type.TypeReference;

class ObjectMapperDeserializerTest {
@Test
void shouldDeserializeEntity() {
MyEntity expected = new MyEntity(1, "entity1");
ObjectMapperDeserializer<MyEntity> deserializer = new ObjectMapperDeserializer<>(MyEntity.class);
MyEntity actual = deserializer.deserialize("topic", "{\"id\":1,\"name\":\"entity1\"}".getBytes());
assertNotNull(actual);
assertEquals(expected, actual);
}

@Test
void shouldDeserializeListOfEntities() {
TypeReference<List<MyEntity>> listType = new TypeReference<>() {
};
ObjectMapperDeserializer<List<MyEntity>> deserializer = new ObjectMapperDeserializer<>(listType);
List<MyEntity> actuals = deserializer.deserialize("topic",
"[{\"id\":1,\"name\":\"entity1\"},{\"id\":2,\"name\":\"entity2\"}]".getBytes());
assertNotNull(actuals);
assertEquals(2, actuals.size());
}

@Test
void shouldDeserializeNullAsNullString() {
ObjectMapperDeserializer<MyEntity> deserializer = new ObjectMapperDeserializer<>(MyEntity.class);
MyEntity results = deserializer.deserialize("topic", "null".getBytes());
assertNull(results);
}

@Test
void shouldDeserializeNullAsNull() {
ObjectMapperDeserializer<MyEntity> deserializer = new ObjectMapperDeserializer<>(MyEntity.class);
MyEntity results = deserializer.deserialize("topic", null);
assertNull(results);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package io.quarkus.kafka.client.serialization;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;

import java.util.List;
import java.util.Map;

import org.junit.jupiter.api.Test;

class ObjectMapperSerializerTest {

@Test
void shouldSerializeEntity() {
ObjectMapperSerializer<MyEntity> serializer = new ObjectMapperSerializer<>();
MyEntity entity = new MyEntity(1, "entity1");
byte[] result = serializer.serialize("topic", entity);
assertNotNull(result);
assertEquals("{\"id\":1,\"name\":\"entity1\"}", new String(result));
}

@Test
void shouldSerializeListOfEntities() {
ObjectMapperSerializer<List<MyEntity>> serializer = new ObjectMapperSerializer<>();
MyEntity entity1 = new MyEntity(1, "entity1");
MyEntity entity2 = new MyEntity(2, "entity2");
byte[] result = serializer.serialize("topic", List.of(entity1, entity2));
assertNotNull(result);
assertEquals("[{\"id\":1,\"name\":\"entity1\"},{\"id\":2,\"name\":\"entity2\"}]", new String(result));
}

@Test
void shouldSerializeNullAsNullString() {
ObjectMapperSerializer<MyEntity> serializer = new ObjectMapperSerializer<>();
byte[] results = serializer.serialize("topic", null);
assertNotNull(results);
assertEquals("null", new String(results));
}

@Test
void shouldSerializeNullAsNull() {
ObjectMapperSerializer<MyEntity> serializer = new ObjectMapperSerializer<>();
serializer.configure(Map.of(ObjectMapperSerializer.NULL_AS_NULL_CONFIG, "true"), false);
byte[] results = serializer.serialize("topic", null);
assertNull(results);
}

}