Skip to content

Commit

Permalink
quarkusio#2663 JSON-B based (de-)serialization support for Kafka clie…
Browse files Browse the repository at this point in the history
…nt module
  • Loading branch information
gunnarmorling committed Jun 19, 2019
1 parent 68bf66d commit 81bf2f6
Show file tree
Hide file tree
Showing 12 changed files with 326 additions and 162 deletions.
17 changes: 16 additions & 1 deletion extensions/kafka-client/runtime/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,15 @@
<name>Quarkus - Kafka - Client - Runtime</name>

<dependencies>

<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-core</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-jsonb</artifactId>
<optional>true</optional>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
Expand All @@ -29,6 +33,17 @@
<groupId>com.oracle.substratevm</groupId>
<artifactId>svm</artifactId>
</dependency>

<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-junit5-internal</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package io.quarkus.kafka.client.serialization;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Map;

import javax.json.bind.Jsonb;
import javax.json.bind.JsonbBuilder;

import org.apache.kafka.common.serialization.Deserializer;

/**
* A {@link Deserializer} that deserializes JSON using JSON-B.
*/
public class JsonbDeserializer<U> implements Deserializer<U> {

private final Jsonb jsonb;
private final Class<U> type;

public JsonbDeserializer(Class<U> type) {
this(type, JsonbBuilder.create());
}

public JsonbDeserializer(Class<U> type, Jsonb jsonb) {
this.jsonb = jsonb;
this.type = type;
}

@Override
public void configure(Map<String, ?> configs, boolean isKey) {
}

@Override
public U deserialize(String topic, byte[] data) {
if (data == null) {
return null;
}

try (InputStream is = new ByteArrayInputStream(data)) {
return jsonb.fromJson(is, type);
} catch (IOException e) {
throw new RuntimeException(e);
}
}

@Override
public void close() {
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package io.quarkus.kafka.client.serialization;

import java.util.Map;

import javax.json.bind.Jsonb;
import javax.json.bind.JsonbBuilder;

import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;

/**
* A {@link Serde} that (de-)serializes JSON using JSON-B.
*/
public class JsonbSerde<T> implements Serde<T> {

private final Class<T> type;
private final Jsonb jsonb;

public JsonbSerde(Class<T> type) {
this(type, JsonbBuilder.create());
}

public JsonbSerde(Class<T> type, Jsonb jsonb) {
this.type = type;
this.jsonb = jsonb;
}

@Override
public void configure(Map<String, ?> configs, boolean isKey) {
}

@Override
public void close() {
try {
jsonb.close();
} catch (Exception e) {
throw new RuntimeException(e);
}
}

@Override
public Serializer<T> serializer() {
return new JsonbSerializer<T>();
}

@Override
public Deserializer<T> deserializer() {
return new JsonbDeserializer<T>(type);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package io.quarkus.kafka.client.serialization;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Map;

import javax.json.bind.Jsonb;
import javax.json.bind.JsonbBuilder;

import org.apache.kafka.common.serialization.Serializer;

/**
* A {@link Serializer} that serializes to JSON using JSON-B.
*/
public class JsonbSerializer<U> implements Serializer<U> {

private final Jsonb jsonb;

public JsonbSerializer() {
jsonb = JsonbBuilder.create();
}

public JsonbSerializer(Jsonb jsonb) {
this.jsonb = jsonb;
}

@Override
public void configure(Map<String, ?> configs, boolean isKey) {
}

@Override
public byte[] serialize(String topic, U data) {
try (ByteArrayOutputStream output = new ByteArrayOutputStream()) {
jsonb.toJson(data, output);
return output.toByteArray();
} catch (IOException e) {
throw new RuntimeException(e);
}
}

@Override
public void close() {
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package io.quarkus.kafka.client.serde;

import static org.assertj.core.api.Assertions.assertThat;

import org.junit.jupiter.api.Test;

public class JsonbSerdeTest {

@Test
public void shouldSerializeAndDeserializeEntity() {
MyEntity entity = new MyEntity();
entity.id = 42L;
entity.name = "Bob";

try (JsonbSerde<MyEntity> serde = new JsonbSerde<>(MyEntity.class)) {
byte[] serialized = serde.serializer().serialize("my-topic", entity);
MyEntity deserialized = serde.deserializer().deserialize("my-topic", serialized);

assertThat(deserialized.id).isEqualTo(42L);
assertThat(deserialized.name).isEqualTo("Bob");
}
}

public static class MyEntity {
public long id;
public String name;
}
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package io.quarkus.it.kafka.streams;

import io.quarkus.runtime.annotations.RegisterForReflection;

@RegisterForReflection
public class Category {

public String name;
public String value;

public Category() {
}

public Category(String name, String value) {
this.name = name;
this.value = value;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package io.quarkus.it.kafka.streams;

import io.quarkus.runtime.annotations.RegisterForReflection;

@RegisterForReflection
public class Customer {

public int id;
public String name;
public int category;

public Customer() {
}

public Customer(int id, String name, int category) {
this.id = id;
this.name = name;
this.category = category;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package io.quarkus.it.kafka.streams;

import io.quarkus.runtime.annotations.RegisterForReflection;

@RegisterForReflection
public class EnrichedCustomer {

public int id;
public String name;
public Category category;

public EnrichedCustomer() {
}

public EnrichedCustomer(int id, String name, Category category) {
this.id = id;
this.name = name;
this.category = category;
}
}
Loading

0 comments on commit 81bf2f6

Please sign in to comment.