Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add JsonbSerde to Kafka client module #2880

Merged
merged 1 commit into from
Jun 19, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion extensions/kafka-client/runtime/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,15 @@
<name>Quarkus - Kafka - Client - Runtime</name>

<dependencies>

<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-core</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-jsonb</artifactId>
<optional>true</optional>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
Expand All @@ -29,6 +33,17 @@
<groupId>com.oracle.substratevm</groupId>
<artifactId>svm</artifactId>
</dependency>

<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-junit5-internal</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package io.quarkus.kafka.client.serialization;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Map;

import javax.json.bind.Jsonb;
import javax.json.bind.JsonbBuilder;

import org.apache.kafka.common.serialization.Deserializer;

/**
* A {@link Deserializer} that deserializes JSON using JSON-B.
*/
public class JsonbDeserializer<T> implements Deserializer<T> {

private final Jsonb jsonb;
private final Class<T> type;
private final boolean jsonbNeedsClosing;

public JsonbDeserializer(Class<T> type) {
this(type, JsonbBuilder.create(), true);
}

public JsonbDeserializer(Class<T> type, Jsonb jsonb) {
this(type, jsonb, false);
}

private JsonbDeserializer(Class<T> type, Jsonb jsonb, boolean jsonbNeedsClosing) {
this.type = type;
this.jsonb = jsonb;
this.jsonbNeedsClosing = jsonbNeedsClosing;
}

@Override
public void configure(Map<String, ?> configs, boolean isKey) {
}

@Override
public T deserialize(String topic, byte[] data) {
if (data == null) {
return null;
}

try (InputStream is = new ByteArrayInputStream(data)) {
return jsonb.fromJson(is, type);
} catch (IOException e) {
throw new RuntimeException(e);
}
}

@Override
public void close() {
if (!jsonbNeedsClosing) {
return;
}

try {
jsonb.close();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package io.quarkus.kafka.client.serialization;

import java.util.Map;

import javax.json.bind.Jsonb;
import javax.json.bind.JsonbBuilder;

import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;

/**
* A {@link Serde} that (de-)serializes JSON using JSON-B.
*/
public class JsonbSerde<T> implements Serde<T> {

private final Jsonb jsonb;
private final boolean jsonbNeedsClosing;

private final JsonbSerializer<T> serializer;
private final JsonbDeserializer<T> deserializer;

public JsonbSerde(Class<T> type) {
this(type, JsonbBuilder.create(), true);
}

public JsonbSerde(Class<T> type, Jsonb jsonb) {
this(type, jsonb, false);
}

private JsonbSerde(Class<T> type, Jsonb jsonb, boolean jsonbNeedsClosing) {
this.jsonb = jsonb;
this.jsonbNeedsClosing = jsonbNeedsClosing;

this.serializer = new JsonbSerializer<T>(jsonb);
this.deserializer = new JsonbDeserializer<T>(type, jsonb);
}

@Override
public void configure(Map<String, ?> configs, boolean isKey) {
}

@Override
public void close() {
serializer.close();
deserializer.close();

if (jsonbNeedsClosing) {
try {
jsonb.close();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}

@Override
public Serializer<T> serializer() {
return serializer;
}

@Override
public Deserializer<T> deserializer() {
return deserializer;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package io.quarkus.kafka.client.serialization;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Map;

import javax.json.bind.Jsonb;
import javax.json.bind.JsonbBuilder;

import org.apache.kafka.common.serialization.Serializer;

/**
* A {@link Serializer} that serializes to JSON using JSON-B.
*/
public class JsonbSerializer<T> implements Serializer<T> {

private final Jsonb jsonb;
private final boolean jsonbNeedsClosing;

public JsonbSerializer() {
this(JsonbBuilder.create(), true);
}

public JsonbSerializer(Jsonb jsonb) {
this(jsonb, false);
}

private JsonbSerializer(Jsonb jsonb, boolean jsonbNeedsClosing) {
this.jsonb = jsonb;
this.jsonbNeedsClosing = jsonbNeedsClosing;
}

@Override
public void configure(Map<String, ?> configs, boolean isKey) {
}

@Override
public byte[] serialize(String topic, T data) {
try (ByteArrayOutputStream output = new ByteArrayOutputStream()) {
jsonb.toJson(data, output);
return output.toByteArray();
} catch (IOException e) {
throw new RuntimeException(e);
}
}

@Override
public void close() {
cescoffier marked this conversation as resolved.
Show resolved Hide resolved
if (!jsonbNeedsClosing) {
return;
}

try {
jsonb.close();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package io.quarkus.kafka.client.serde;

import static org.assertj.core.api.Assertions.assertThat;

import javax.json.bind.Jsonb;
import javax.json.bind.JsonbBuilder;

import org.junit.jupiter.api.Test;

import io.quarkus.kafka.client.serialization.JsonbSerde;

public class JsonbSerdeTest {

@Test
public void shouldSerializeAndDeserializeEntity() {
MyEntity entity = new MyEntity();
entity.id = 42L;
entity.name = "Bob";

try (JsonbSerde<MyEntity> serde = new JsonbSerde<>(MyEntity.class)) {
byte[] serialized = serde.serializer().serialize("my-topic", entity);
MyEntity deserialized = serde.deserializer().deserialize("my-topic", serialized);

assertThat(deserialized.id).isEqualTo(42L);
assertThat(deserialized.name).isEqualTo("Bob");
}
}

@Test
public void shouldSerializeAndDeserializeEntityWithGivenJsonb() throws Exception {
MyEntity entity = new MyEntity();
entity.id = 42L;
entity.name = "Bob";

try (Jsonb jsonb = JsonbBuilder.create(); JsonbSerde<MyEntity> serde = new JsonbSerde<>(MyEntity.class, jsonb)) {
byte[] serialized = serde.serializer().serialize("my-topic", entity);
MyEntity deserialized = serde.deserializer().deserialize("my-topic", serialized);

assertThat(deserialized.id).isEqualTo(42L);
assertThat(deserialized.name).isEqualTo("Bob");
}
}

public static class MyEntity {
public long id;
public String name;
}
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package io.quarkus.it.kafka.streams;

import io.quarkus.runtime.annotations.RegisterForReflection;

@RegisterForReflection
public class Category {

public String name;
public String value;

public Category() {
}

public Category(String name, String value) {
this.name = name;
this.value = value;
}
}
Loading