diff --git a/extensions/kafka-client/runtime/pom.xml b/extensions/kafka-client/runtime/pom.xml
index d3b6af0639df6..843aa403a347c 100644
--- a/extensions/kafka-client/runtime/pom.xml
+++ b/extensions/kafka-client/runtime/pom.xml
@@ -14,11 +14,15 @@
Quarkus - Kafka - Client - Runtime
-
io.quarkus
quarkus-core
+
+ io.quarkus
+ quarkus-jsonb
+ true
+
org.apache.kafka
@@ -29,6 +33,17 @@
com.oracle.substratevm
svm
+
+
+ io.quarkus
+ quarkus-junit5-internal
+ test
+
+
+ org.assertj
+ assertj-core
+ test
+
diff --git a/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/serialization/JsonbDeserializer.java b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/serialization/JsonbDeserializer.java
new file mode 100644
index 0000000000000..42a12a03c8c70
--- /dev/null
+++ b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/serialization/JsonbDeserializer.java
@@ -0,0 +1,65 @@
+package io.quarkus.kafka.client.serialization;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Map;
+
+import javax.json.bind.Jsonb;
+import javax.json.bind.JsonbBuilder;
+
+import org.apache.kafka.common.serialization.Deserializer;
+
+/**
+ * A {@link Deserializer} that deserializes JSON using JSON-B.
+ */
+public class JsonbDeserializer implements Deserializer {
+
+ private final Jsonb jsonb;
+ private final Class type;
+ private final boolean jsonbNeedsClosing;
+
+ public JsonbDeserializer(Class type) {
+ this(type, JsonbBuilder.create(), true);
+ }
+
+ public JsonbDeserializer(Class type, Jsonb jsonb) {
+ this(type, jsonb, false);
+ }
+
+ private JsonbDeserializer(Class type, Jsonb jsonb, boolean jsonbNeedsClosing) {
+ this.type = type;
+ this.jsonb = jsonb;
+ this.jsonbNeedsClosing = jsonbNeedsClosing;
+ }
+
+ @Override
+ public void configure(Map configs, boolean isKey) {
+ }
+
+ @Override
+ public T deserialize(String topic, byte[] data) {
+ if (data == null) {
+ return null;
+ }
+
+ try (InputStream is = new ByteArrayInputStream(data)) {
+ return jsonb.fromJson(is, type);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void close() {
+ if (!jsonbNeedsClosing) {
+ return;
+ }
+
+ try {
+ jsonb.close();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/serialization/JsonbSerde.java b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/serialization/JsonbSerde.java
new file mode 100644
index 0000000000000..c14e3ba7ef12d
--- /dev/null
+++ b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/serialization/JsonbSerde.java
@@ -0,0 +1,66 @@
+package io.quarkus.kafka.client.serialization;
+
+import java.util.Map;
+
+import javax.json.bind.Jsonb;
+import javax.json.bind.JsonbBuilder;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serializer;
+
+/**
+ * A {@link Serde} that (de-)serializes JSON using JSON-B.
+ */
+public class JsonbSerde implements Serde {
+
+ private final Jsonb jsonb;
+ private final boolean jsonbNeedsClosing;
+
+ private final JsonbSerializer serializer;
+ private final JsonbDeserializer deserializer;
+
+ public JsonbSerde(Class type) {
+ this(type, JsonbBuilder.create(), true);
+ }
+
+ public JsonbSerde(Class type, Jsonb jsonb) {
+ this(type, jsonb, false);
+ }
+
+ private JsonbSerde(Class type, Jsonb jsonb, boolean jsonbNeedsClosing) {
+ this.jsonb = jsonb;
+ this.jsonbNeedsClosing = jsonbNeedsClosing;
+
+ this.serializer = new JsonbSerializer(jsonb);
+ this.deserializer = new JsonbDeserializer(type, jsonb);
+ }
+
+ @Override
+ public void configure(Map configs, boolean isKey) {
+ }
+
+ @Override
+ public void close() {
+ serializer.close();
+ deserializer.close();
+
+ if (jsonbNeedsClosing) {
+ try {
+ jsonb.close();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ @Override
+ public Serializer serializer() {
+ return serializer;
+ }
+
+ @Override
+ public Deserializer deserializer() {
+ return deserializer;
+ }
+}
diff --git a/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/serialization/JsonbSerializer.java b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/serialization/JsonbSerializer.java
new file mode 100644
index 0000000000000..67789c6d93900
--- /dev/null
+++ b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/serialization/JsonbSerializer.java
@@ -0,0 +1,59 @@
+package io.quarkus.kafka.client.serialization;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Map;
+
+import javax.json.bind.Jsonb;
+import javax.json.bind.JsonbBuilder;
+
+import org.apache.kafka.common.serialization.Serializer;
+
+/**
+ * A {@link Serializer} that serializes to JSON using JSON-B.
+ */
+public class JsonbSerializer implements Serializer {
+
+ private final Jsonb jsonb;
+ private final boolean jsonbNeedsClosing;
+
+ public JsonbSerializer() {
+ this(JsonbBuilder.create(), true);
+ }
+
+ public JsonbSerializer(Jsonb jsonb) {
+ this(jsonb, false);
+ }
+
+ private JsonbSerializer(Jsonb jsonb, boolean jsonbNeedsClosing) {
+ this.jsonb = jsonb;
+ this.jsonbNeedsClosing = jsonbNeedsClosing;
+ }
+
+ @Override
+ public void configure(Map configs, boolean isKey) {
+ }
+
+ @Override
+ public byte[] serialize(String topic, T data) {
+ try (ByteArrayOutputStream output = new ByteArrayOutputStream()) {
+ jsonb.toJson(data, output);
+ return output.toByteArray();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void close() {
+ if (!jsonbNeedsClosing) {
+ return;
+ }
+
+ try {
+ jsonb.close();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/extensions/kafka-client/runtime/src/test/java/io/quarkus/kafka/client/serde/JsonbSerdeTest.java b/extensions/kafka-client/runtime/src/test/java/io/quarkus/kafka/client/serde/JsonbSerdeTest.java
new file mode 100644
index 0000000000000..a3e74b92710c0
--- /dev/null
+++ b/extensions/kafka-client/runtime/src/test/java/io/quarkus/kafka/client/serde/JsonbSerdeTest.java
@@ -0,0 +1,48 @@
+package io.quarkus.kafka.client.serde;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import javax.json.bind.Jsonb;
+import javax.json.bind.JsonbBuilder;
+
+import org.junit.jupiter.api.Test;
+
+import io.quarkus.kafka.client.serialization.JsonbSerde;
+
+public class JsonbSerdeTest {
+
+ @Test
+ public void shouldSerializeAndDeserializeEntity() {
+ MyEntity entity = new MyEntity();
+ entity.id = 42L;
+ entity.name = "Bob";
+
+ try (JsonbSerde serde = new JsonbSerde<>(MyEntity.class)) {
+ byte[] serialized = serde.serializer().serialize("my-topic", entity);
+ MyEntity deserialized = serde.deserializer().deserialize("my-topic", serialized);
+
+ assertThat(deserialized.id).isEqualTo(42L);
+ assertThat(deserialized.name).isEqualTo("Bob");
+ }
+ }
+
+ @Test
+ public void shouldSerializeAndDeserializeEntityWithGivenJsonb() throws Exception {
+ MyEntity entity = new MyEntity();
+ entity.id = 42L;
+ entity.name = "Bob";
+
+ try (Jsonb jsonb = JsonbBuilder.create(); JsonbSerde serde = new JsonbSerde<>(MyEntity.class, jsonb)) {
+ byte[] serialized = serde.serializer().serialize("my-topic", entity);
+ MyEntity deserialized = serde.deserializer().deserialize("my-topic", serialized);
+
+ assertThat(deserialized.id).isEqualTo(42L);
+ assertThat(deserialized.name).isEqualTo("Bob");
+ }
+ }
+
+ public static class MyEntity {
+ public long id;
+ public String name;
+ }
+}
diff --git a/integration-tests/kafka/src/main/java/io/quarkus/it/kafka/JsonObjectSerde.java b/integration-tests/kafka/src/main/java/io/quarkus/it/kafka/JsonObjectSerde.java
deleted file mode 100644
index d5cb3fe43763a..0000000000000
--- a/integration-tests/kafka/src/main/java/io/quarkus/it/kafka/JsonObjectSerde.java
+++ /dev/null
@@ -1,84 +0,0 @@
-package io.quarkus.it.kafka;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.util.Map;
-
-import javax.json.Json;
-import javax.json.JsonObject;
-import javax.json.JsonReader;
-
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.common.serialization.Serializer;
-
-/**
- * A {@link Serde} that (de-)serializes JSON.
- */
-public class JsonObjectSerde implements Serde {
-
- public JsonObjectSerde() {
- }
-
- @Override
- public void configure(Map configs, boolean isKey) {
- }
-
- @Override
- public void close() {
- }
-
- @Override
- public Serializer serializer() {
- return new JsonSerializer();
- }
-
- @Override
- public Deserializer deserializer() {
- return new JsonDeserializer();
- }
-
- private final class JsonDeserializer implements Deserializer {
-
- @Override
- public void configure(Map configs, boolean isKey) {
- }
-
- @Override
- public JsonObject deserialize(String topic, byte[] data) {
- if (data == null) {
- return null;
- }
-
- try (JsonReader reader = Json.createReader(new ByteArrayInputStream(data))) {
- return reader.readObject();
- }
- }
-
- @Override
- public void close() {
- }
- }
-
- private final class JsonSerializer implements Serializer {
-
- @Override
- public void configure(Map configs, boolean isKey) {
- }
-
- @Override
- public byte[] serialize(String topic, JsonObject data) {
- try (ByteArrayOutputStream output = new ByteArrayOutputStream()) {
- Json.createWriter(output).writeObject(data);
- return output.toByteArray();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public void close() {
- }
- }
-}
diff --git a/integration-tests/kafka/src/main/java/io/quarkus/it/kafka/streams/Category.java b/integration-tests/kafka/src/main/java/io/quarkus/it/kafka/streams/Category.java
new file mode 100644
index 0000000000000..dab14a57c432c
--- /dev/null
+++ b/integration-tests/kafka/src/main/java/io/quarkus/it/kafka/streams/Category.java
@@ -0,0 +1,18 @@
+package io.quarkus.it.kafka.streams;
+
+import io.quarkus.runtime.annotations.RegisterForReflection;
+
+@RegisterForReflection
+public class Category {
+
+ public String name;
+ public String value;
+
+ public Category() {
+ }
+
+ public Category(String name, String value) {
+ this.name = name;
+ this.value = value;
+ }
+}
diff --git a/integration-tests/kafka/src/main/java/io/quarkus/it/kafka/streams/Customer.java b/integration-tests/kafka/src/main/java/io/quarkus/it/kafka/streams/Customer.java
new file mode 100644
index 0000000000000..c18b2d1d17468
--- /dev/null
+++ b/integration-tests/kafka/src/main/java/io/quarkus/it/kafka/streams/Customer.java
@@ -0,0 +1,20 @@
+package io.quarkus.it.kafka.streams;
+
+import io.quarkus.runtime.annotations.RegisterForReflection;
+
+@RegisterForReflection
+public class Customer {
+
+ public int id;
+ public String name;
+ public int category;
+
+ public Customer() {
+ }
+
+ public Customer(int id, String name, int category) {
+ this.id = id;
+ this.name = name;
+ this.category = category;
+ }
+}
diff --git a/integration-tests/kafka/src/main/java/io/quarkus/it/kafka/streams/EnrichedCustomer.java b/integration-tests/kafka/src/main/java/io/quarkus/it/kafka/streams/EnrichedCustomer.java
new file mode 100644
index 0000000000000..826d344e2d7bf
--- /dev/null
+++ b/integration-tests/kafka/src/main/java/io/quarkus/it/kafka/streams/EnrichedCustomer.java
@@ -0,0 +1,20 @@
+package io.quarkus.it.kafka.streams;
+
+import io.quarkus.runtime.annotations.RegisterForReflection;
+
+@RegisterForReflection
+public class EnrichedCustomer {
+
+ public int id;
+ public String name;
+ public Category category;
+
+ public EnrichedCustomer() {
+ }
+
+ public EnrichedCustomer(int id, String name, Category category) {
+ this.id = id;
+ this.name = name;
+ this.category = category;
+ }
+}
diff --git a/integration-tests/kafka/src/main/java/io/quarkus/it/kafka/KafkaStreamsPipeline.java b/integration-tests/kafka/src/main/java/io/quarkus/it/kafka/streams/KafkaStreamsPipeline.java
similarity index 82%
rename from integration-tests/kafka/src/main/java/io/quarkus/it/kafka/KafkaStreamsPipeline.java
rename to integration-tests/kafka/src/main/java/io/quarkus/it/kafka/streams/KafkaStreamsPipeline.java
index d7cebe8a7312f..561887f80f567 100644
--- a/integration-tests/kafka/src/main/java/io/quarkus/it/kafka/KafkaStreamsPipeline.java
+++ b/integration-tests/kafka/src/main/java/io/quarkus/it/kafka/streams/KafkaStreamsPipeline.java
@@ -1,4 +1,4 @@
-package io.quarkus.it.kafka;
+package io.quarkus.it.kafka.streams;
import static org.awaitility.Awaitility.await;
@@ -14,9 +14,6 @@
import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.event.Observes;
-import javax.json.Json;
-import javax.json.JsonObject;
-import javax.json.JsonObjectBuilder;
import javax.ws.rs.Consumes;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
@@ -45,6 +42,7 @@
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.streams.state.Stores;
+import io.quarkus.kafka.client.serialization.JsonbSerde;
import io.quarkus.runtime.StartupEvent;
@ApplicationScoped
@@ -68,30 +66,30 @@ void onStart(@Observes StartupEvent ev) {
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
StreamsBuilder builder = new StreamsBuilder();
- JsonObjectSerde jsonNodeSerde = new JsonObjectSerde();
- KTable categories = builder.table(
+ JsonbSerde categorySerde = new JsonbSerde<>(Category.class);
+ JsonbSerde customerSerde = new JsonbSerde<>(Customer.class);
+ JsonbSerde enrichedCustomerSerde = new JsonbSerde<>(EnrichedCustomer.class);
+
+ KTable categories = builder.table(
CATEGORIES_TOPIC_NAME,
- Consumed.with(Serdes.Integer(), jsonNodeSerde));
+ Consumed.with(Serdes.Integer(), categorySerde));
- KStream customers = builder
- .stream(CUSTOMERS_TOPIC_NAME, Consumed.with(Serdes.Integer(), jsonNodeSerde))
- .selectKey((k, v) -> v.getJsonNumber("category").intValue())
+ KStream customers = builder
+ .stream(CUSTOMERS_TOPIC_NAME, Consumed.with(Serdes.Integer(), customerSerde))
+ .selectKey((id, customer) -> customer.category)
.join(
categories,
- (v1, v2) -> {
- JsonObjectBuilder target = Json.createObjectBuilder();
- v1.forEach(target::add);
- target.add("category", v2);
- return target.build();
+ (customer, category) -> {
+ return new EnrichedCustomer(customer.id, customer.name, category);
},
- Joined.with(Serdes.Integer(), jsonNodeSerde, null));
+ Joined.with(Serdes.Integer(), customerSerde, categorySerde));
KeyValueBytesStoreSupplier storeSupplier = Stores.inMemoryKeyValueStore("countstore");
customers.groupByKey()
.count(Materialized. as(storeSupplier));
- customers.selectKey((k, v) -> v.getJsonNumber("id").intValue())
- .to("streams-test-customers-processed", Produced.with(Serdes.Integer(), jsonNodeSerde));
+ customers.selectKey((categoryId, customer) -> customer.id)
+ .to("streams-test-customers-processed", Produced.with(Serdes.Integer(), enrichedCustomerSerde));
streams = new KafkaStreams(builder.build(), props);
diff --git a/integration-tests/kafka/src/test/java/io/quarkus/it/kafka/KafkaStreamsITCase.java b/integration-tests/kafka/src/test/java/io/quarkus/it/kafka/streams/KafkaStreamsITCase.java
similarity index 76%
rename from integration-tests/kafka/src/test/java/io/quarkus/it/kafka/KafkaStreamsITCase.java
rename to integration-tests/kafka/src/test/java/io/quarkus/it/kafka/streams/KafkaStreamsITCase.java
index 2410b099dc1bd..874db3541e660 100644
--- a/integration-tests/kafka/src/test/java/io/quarkus/it/kafka/KafkaStreamsITCase.java
+++ b/integration-tests/kafka/src/test/java/io/quarkus/it/kafka/streams/KafkaStreamsITCase.java
@@ -1,4 +1,4 @@
-package io.quarkus.it.kafka;
+package io.quarkus.it.kafka.streams;
import io.quarkus.test.junit.SubstrateTest;
diff --git a/integration-tests/kafka/src/test/java/io/quarkus/it/kafka/KafkaStreamsTest.java b/integration-tests/kafka/src/test/java/io/quarkus/it/kafka/streams/KafkaStreamsTest.java
similarity index 54%
rename from integration-tests/kafka/src/test/java/io/quarkus/it/kafka/KafkaStreamsTest.java
rename to integration-tests/kafka/src/test/java/io/quarkus/it/kafka/streams/KafkaStreamsTest.java
index 17e0a3afe8230..28cfba7027dd0 100644
--- a/integration-tests/kafka/src/test/java/io/quarkus/it/kafka/KafkaStreamsTest.java
+++ b/integration-tests/kafka/src/test/java/io/quarkus/it/kafka/streams/KafkaStreamsTest.java
@@ -1,16 +1,11 @@
-package io.quarkus.it.kafka;
+package io.quarkus.it.kafka.streams;
-import java.io.StringReader;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
-import javax.json.Json;
-import javax.json.JsonObject;
-import javax.json.JsonReader;
-
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -22,11 +17,12 @@
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
+import io.quarkus.it.kafka.KafkaTestResource;
+import io.quarkus.kafka.client.serialization.JsonbDeserializer;
+import io.quarkus.kafka.client.serialization.JsonbSerializer;
import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusTest;
import io.restassured.RestAssured;
@@ -35,26 +31,36 @@
@QuarkusTest
public class KafkaStreamsTest {
- private static Producer createProducer() {
+ private static Producer createProducer() {
+ Properties props = new Properties();
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19092");
+ props.put(ProducerConfig.CLIENT_ID_CONFIG, "streams-test-producer");
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonbSerializer.class.getName());
+
+ return new KafkaProducer(props);
+ }
+
+ private static Producer createCategoryProducer() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19092");
props.put(ProducerConfig.CLIENT_ID_CONFIG, "streams-test-producer");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
- props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonbSerializer.class.getName());
- return new KafkaProducer(props);
+ return new KafkaProducer<>(props);
}
- private static KafkaConsumer createConsumer() {
+ private static KafkaConsumer createConsumer() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "streams-test-consumer");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getName());
- props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, EnrichedCustomerDeserializer.class.getName());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
- KafkaConsumer consumer = new KafkaConsumer<>(props);
+ KafkaConsumer consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("streams-test-customers-processed"));
return consumer;
}
@@ -63,40 +69,40 @@ private static KafkaConsumer createConsumer() {
public void testKafkaStreams() throws Exception {
produceCustomers();
- Consumer consumer = createConsumer();
- List> records = poll(consumer, 4);
+ Consumer consumer = createConsumer();
+ List> records = poll(consumer, 4);
- ConsumerRecord record = records.get(0);
+ ConsumerRecord record = records.get(0);
Assertions.assertEquals(101, record.key());
- JsonObject customer = parse(record.value());
- Assertions.assertEquals(101, customer.getInt("id"));
- Assertions.assertEquals("Bob", customer.getString("name"));
- Assertions.assertEquals("B2B", customer.getJsonObject("category").getString("name"));
- Assertions.assertEquals("business-to-business", customer.getJsonObject("category").getString("value"));
+ EnrichedCustomer customer = record.value();
+ Assertions.assertEquals(101, customer.id);
+ Assertions.assertEquals("Bob", customer.name);
+ Assertions.assertEquals("B2B", customer.category.name);
+ Assertions.assertEquals("business-to-business", customer.category.value);
record = records.get(1);
Assertions.assertEquals(102, record.key());
- customer = parse(record.value());
- Assertions.assertEquals(102, customer.getInt("id"));
- Assertions.assertEquals("Becky", customer.getString("name"));
- Assertions.assertEquals("B2C", customer.getJsonObject("category").getString("name"));
- Assertions.assertEquals("business-to-customer", customer.getJsonObject("category").getString("value"));
+ customer = record.value();
+ Assertions.assertEquals(102, customer.id);
+ Assertions.assertEquals("Becky", customer.name);
+ Assertions.assertEquals("B2C", customer.category.name);
+ Assertions.assertEquals("business-to-customer", customer.category.value);
record = records.get(2);
Assertions.assertEquals(103, record.key());
- customer = parse(record.value());
- Assertions.assertEquals(103, customer.getInt("id"));
- Assertions.assertEquals("Bruce", customer.getString("name"));
- Assertions.assertEquals("B2B", customer.getJsonObject("category").getString("name"));
- Assertions.assertEquals("business-to-business", customer.getJsonObject("category").getString("value"));
+ customer = record.value();
+ Assertions.assertEquals(103, customer.id);
+ Assertions.assertEquals("Bruce", customer.name);
+ Assertions.assertEquals("B2B", customer.category.name);
+ Assertions.assertEquals("business-to-business", customer.category.value);
record = records.get(3);
Assertions.assertEquals(104, record.key());
- customer = parse(record.value());
- Assertions.assertEquals(104, customer.getInt("id"));
- Assertions.assertEquals("Bert", customer.getString("name"));
- Assertions.assertEquals("B2B", customer.getJsonObject("category").getString("name"));
- Assertions.assertEquals("business-to-business", customer.getJsonObject("category").getString("value"));
+ customer = record.value();
+ Assertions.assertEquals(104, customer.id);
+ Assertions.assertEquals("Bert", customer.name);
+ Assertions.assertEquals("B2B", customer.category.name);
+ Assertions.assertEquals("business-to-business", customer.category.value);
assertCategoryCount(1, 3);
assertCategoryCount(2, 1);
@@ -107,21 +113,17 @@ record = records.get(3);
}
private void produceCustomers() {
- Producer producer = createProducer();
-
- producer.send(new ProducerRecord<>("streams-test-categories", 1,
- "{ \"name\" : \"B2B\", \"value\" : \"business-to-business\" }"));
- producer.send(new ProducerRecord<>("streams-test-categories", 2,
- "{ \"name\" : \"B2C\", \"value\" : \"business-to-customer\" }"));
-
- producer.send(
- new ProducerRecord<>("streams-test-customers", 101, "{ \"id\" : 101, \"name\" : \"Bob\", \"category\" : 1 }"));
- producer.send(new ProducerRecord<>("streams-test-customers", 102,
- "{ \"id\" : 102, \"name\" : \"Becky\", \"category\" : 2 }"));
- producer.send(new ProducerRecord<>("streams-test-customers", 103,
- "{ \"id\" : 103, \"name\" : \"Bruce\", \"category\" : 1 }"));
- producer.send(new ProducerRecord<>("streams-test-customers", 104,
- "{ \"id\" : 104, \"name\" : \"Bert\", \"category\" : 1 }"));
+ Producer producer = createProducer();
+
+ Producer categoryProducer = createCategoryProducer();
+
+ categoryProducer.send(new ProducerRecord<>("streams-test-categories", 1, new Category("B2B", "business-to-business")));
+ categoryProducer.send(new ProducerRecord<>("streams-test-categories", 2, new Category("B2C", "business-to-customer")));
+
+ producer.send(new ProducerRecord<>("streams-test-customers", 101, new Customer(101, "Bob", 1)));
+ producer.send(new ProducerRecord<>("streams-test-customers", 102, new Customer(102, "Becky", 2)));
+ producer.send(new ProducerRecord<>("streams-test-customers", 103, new Customer(103, "Bruce", 1)));
+ producer.send(new ProducerRecord<>("streams-test-customers", 104, new Customer(104, "Bert", 1)));
}
private void assertCategoryCount(int categoryId, int expectedCount) throws Exception {
@@ -146,11 +148,12 @@ private Integer getCategoryCount(int categoryId) {
return null;
}
- private List> poll(Consumer consumer, int expectedRecordCount) {
+ private List> poll(Consumer consumer,
+ int expectedRecordCount) {
int fetched = 0;
- List> result = new ArrayList<>();
+ List> result = new ArrayList<>();
while (fetched < expectedRecordCount) {
- ConsumerRecords records = consumer.poll(Duration.ofMillis(20000));
+ ConsumerRecords records = consumer.poll(Duration.ofMillis(20000));
records.forEach(result::add);
fetched = result.size();
}
@@ -158,9 +161,10 @@ private List> poll(Consumer con
return result;
}
- private JsonObject parse(String json) {
- try (JsonReader reader = Json.createReader(new StringReader(json))) {
- return reader.readObject();
+ public static class EnrichedCustomerDeserializer extends JsonbDeserializer {
+
+ public EnrichedCustomerDeserializer() {
+ super(EnrichedCustomer.class);
}
}
}