From 81bf2f664d2684e77c873f90d6424cd638603efd Mon Sep 17 00:00:00 2001 From: Gunnar Morling Date: Tue, 18 Jun 2019 18:06:44 +0200 Subject: [PATCH] #2663 JSON-B based (de-)serialization support for Kafka client module --- extensions/kafka-client/runtime/pom.xml | 17 ++- .../serialization/JsonbDeserializer.java | 50 ++++++++ .../client/serialization/JsonbSerde.java | 51 ++++++++ .../client/serialization/JsonbSerializer.java | 44 +++++++ .../kafka/client/serde/JsonbSerdeTest.java | 28 ++++ .../io/quarkus/it/kafka/JsonObjectSerde.java | 84 ------------ .../io/quarkus/it/kafka/streams/Category.java | 18 +++ .../io/quarkus/it/kafka/streams/Customer.java | 20 +++ .../it/kafka/streams/EnrichedCustomer.java | 20 +++ .../{ => streams}/KafkaStreamsPipeline.java | 34 +++-- .../{ => streams}/KafkaStreamsITCase.java | 2 +- .../kafka/{ => streams}/KafkaStreamsTest.java | 120 +++++++++--------- 12 files changed, 326 insertions(+), 162 deletions(-) create mode 100644 extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/serialization/JsonbDeserializer.java create mode 100644 extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/serialization/JsonbSerde.java create mode 100644 extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/serialization/JsonbSerializer.java create mode 100644 extensions/kafka-client/runtime/src/test/java/io/quarkus/kafka/client/serde/JsonbSerdeTest.java delete mode 100644 integration-tests/kafka/src/main/java/io/quarkus/it/kafka/JsonObjectSerde.java create mode 100644 integration-tests/kafka/src/main/java/io/quarkus/it/kafka/streams/Category.java create mode 100644 integration-tests/kafka/src/main/java/io/quarkus/it/kafka/streams/Customer.java create mode 100644 integration-tests/kafka/src/main/java/io/quarkus/it/kafka/streams/EnrichedCustomer.java rename integration-tests/kafka/src/main/java/io/quarkus/it/kafka/{ => streams}/KafkaStreamsPipeline.java (82%) rename integration-tests/kafka/src/test/java/io/quarkus/it/kafka/{ => streams}/KafkaStreamsITCase.java (76%) rename integration-tests/kafka/src/test/java/io/quarkus/it/kafka/{ => streams}/KafkaStreamsTest.java (54%) diff --git a/extensions/kafka-client/runtime/pom.xml b/extensions/kafka-client/runtime/pom.xml index d3b6af0639df66..843aa403a347cd 100644 --- a/extensions/kafka-client/runtime/pom.xml +++ b/extensions/kafka-client/runtime/pom.xml @@ -14,11 +14,15 @@ Quarkus - Kafka - Client - Runtime - io.quarkus quarkus-core + + io.quarkus + quarkus-jsonb + true + org.apache.kafka @@ -29,6 +33,17 @@ com.oracle.substratevm svm + + + io.quarkus + quarkus-junit5-internal + test + + + org.assertj + assertj-core + test + diff --git a/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/serialization/JsonbDeserializer.java b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/serialization/JsonbDeserializer.java new file mode 100644 index 00000000000000..17164636617a7a --- /dev/null +++ b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/serialization/JsonbDeserializer.java @@ -0,0 +1,50 @@ +package io.quarkus.kafka.client.serialization; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.Map; + +import javax.json.bind.Jsonb; +import javax.json.bind.JsonbBuilder; + +import org.apache.kafka.common.serialization.Deserializer; + +/** + * A {@link Deserializer} that deserializes JSON using JSON-B. + */ +public class JsonbDeserializer implements Deserializer { + + private final Jsonb jsonb; + private final Class type; + + public JsonbDeserializer(Class type) { + this(type, JsonbBuilder.create()); + } + + public JsonbDeserializer(Class type, Jsonb jsonb) { + this.jsonb = jsonb; + this.type = type; + } + + @Override + public void configure(Map configs, boolean isKey) { + } + + @Override + public U deserialize(String topic, byte[] data) { + if (data == null) { + return null; + } + + try (InputStream is = new ByteArrayInputStream(data)) { + return jsonb.fromJson(is, type); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public void close() { + } +} diff --git a/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/serialization/JsonbSerde.java b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/serialization/JsonbSerde.java new file mode 100644 index 00000000000000..e517ca4fdf76eb --- /dev/null +++ b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/serialization/JsonbSerde.java @@ -0,0 +1,51 @@ +package io.quarkus.kafka.client.serialization; + +import java.util.Map; + +import javax.json.bind.Jsonb; +import javax.json.bind.JsonbBuilder; + +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serializer; + +/** + * A {@link Serde} that (de-)serializes JSON using JSON-B. + */ +public class JsonbSerde implements Serde { + + private final Class type; + private final Jsonb jsonb; + + public JsonbSerde(Class type) { + this(type, JsonbBuilder.create()); + } + + public JsonbSerde(Class type, Jsonb jsonb) { + this.type = type; + this.jsonb = jsonb; + } + + @Override + public void configure(Map configs, boolean isKey) { + } + + @Override + public void close() { + try { + jsonb.close(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public Serializer serializer() { + return new JsonbSerializer(); + } + + @Override + public Deserializer deserializer() { + return new JsonbDeserializer(type); + } +} diff --git a/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/serialization/JsonbSerializer.java b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/serialization/JsonbSerializer.java new file mode 100644 index 00000000000000..b093c4190ffe58 --- /dev/null +++ b/extensions/kafka-client/runtime/src/main/java/io/quarkus/kafka/client/serialization/JsonbSerializer.java @@ -0,0 +1,44 @@ +package io.quarkus.kafka.client.serialization; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.Map; + +import javax.json.bind.Jsonb; +import javax.json.bind.JsonbBuilder; + +import org.apache.kafka.common.serialization.Serializer; + +/** + * A {@link Serializer} that serializes to JSON using JSON-B. + */ +public class JsonbSerializer implements Serializer { + + private final Jsonb jsonb; + + public JsonbSerializer() { + jsonb = JsonbBuilder.create(); + } + + public JsonbSerializer(Jsonb jsonb) { + this.jsonb = jsonb; + } + + @Override + public void configure(Map configs, boolean isKey) { + } + + @Override + public byte[] serialize(String topic, U data) { + try (ByteArrayOutputStream output = new ByteArrayOutputStream()) { + jsonb.toJson(data, output); + return output.toByteArray(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public void close() { + } +} diff --git a/extensions/kafka-client/runtime/src/test/java/io/quarkus/kafka/client/serde/JsonbSerdeTest.java b/extensions/kafka-client/runtime/src/test/java/io/quarkus/kafka/client/serde/JsonbSerdeTest.java new file mode 100644 index 00000000000000..5ecb6ff948aad0 --- /dev/null +++ b/extensions/kafka-client/runtime/src/test/java/io/quarkus/kafka/client/serde/JsonbSerdeTest.java @@ -0,0 +1,28 @@ +package io.quarkus.kafka.client.serde; + +import static org.assertj.core.api.Assertions.assertThat; + +import org.junit.jupiter.api.Test; + +public class JsonbSerdeTest { + + @Test + public void shouldSerializeAndDeserializeEntity() { + MyEntity entity = new MyEntity(); + entity.id = 42L; + entity.name = "Bob"; + + try (JsonbSerde serde = new JsonbSerde<>(MyEntity.class)) { + byte[] serialized = serde.serializer().serialize("my-topic", entity); + MyEntity deserialized = serde.deserializer().deserialize("my-topic", serialized); + + assertThat(deserialized.id).isEqualTo(42L); + assertThat(deserialized.name).isEqualTo("Bob"); + } + } + + public static class MyEntity { + public long id; + public String name; + } +} diff --git a/integration-tests/kafka/src/main/java/io/quarkus/it/kafka/JsonObjectSerde.java b/integration-tests/kafka/src/main/java/io/quarkus/it/kafka/JsonObjectSerde.java deleted file mode 100644 index d5cb3fe43763a8..00000000000000 --- a/integration-tests/kafka/src/main/java/io/quarkus/it/kafka/JsonObjectSerde.java +++ /dev/null @@ -1,84 +0,0 @@ -package io.quarkus.it.kafka; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.util.Map; - -import javax.json.Json; -import javax.json.JsonObject; -import javax.json.JsonReader; - -import org.apache.kafka.common.serialization.Deserializer; -import org.apache.kafka.common.serialization.Serde; -import org.apache.kafka.common.serialization.Serializer; - -/** - * A {@link Serde} that (de-)serializes JSON. - */ -public class JsonObjectSerde implements Serde { - - public JsonObjectSerde() { - } - - @Override - public void configure(Map configs, boolean isKey) { - } - - @Override - public void close() { - } - - @Override - public Serializer serializer() { - return new JsonSerializer(); - } - - @Override - public Deserializer deserializer() { - return new JsonDeserializer(); - } - - private final class JsonDeserializer implements Deserializer { - - @Override - public void configure(Map configs, boolean isKey) { - } - - @Override - public JsonObject deserialize(String topic, byte[] data) { - if (data == null) { - return null; - } - - try (JsonReader reader = Json.createReader(new ByteArrayInputStream(data))) { - return reader.readObject(); - } - } - - @Override - public void close() { - } - } - - private final class JsonSerializer implements Serializer { - - @Override - public void configure(Map configs, boolean isKey) { - } - - @Override - public byte[] serialize(String topic, JsonObject data) { - try (ByteArrayOutputStream output = new ByteArrayOutputStream()) { - Json.createWriter(output).writeObject(data); - return output.toByteArray(); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - @Override - public void close() { - } - } -} diff --git a/integration-tests/kafka/src/main/java/io/quarkus/it/kafka/streams/Category.java b/integration-tests/kafka/src/main/java/io/quarkus/it/kafka/streams/Category.java new file mode 100644 index 00000000000000..dab14a57c432c6 --- /dev/null +++ b/integration-tests/kafka/src/main/java/io/quarkus/it/kafka/streams/Category.java @@ -0,0 +1,18 @@ +package io.quarkus.it.kafka.streams; + +import io.quarkus.runtime.annotations.RegisterForReflection; + +@RegisterForReflection +public class Category { + + public String name; + public String value; + + public Category() { + } + + public Category(String name, String value) { + this.name = name; + this.value = value; + } +} diff --git a/integration-tests/kafka/src/main/java/io/quarkus/it/kafka/streams/Customer.java b/integration-tests/kafka/src/main/java/io/quarkus/it/kafka/streams/Customer.java new file mode 100644 index 00000000000000..c18b2d1d174682 --- /dev/null +++ b/integration-tests/kafka/src/main/java/io/quarkus/it/kafka/streams/Customer.java @@ -0,0 +1,20 @@ +package io.quarkus.it.kafka.streams; + +import io.quarkus.runtime.annotations.RegisterForReflection; + +@RegisterForReflection +public class Customer { + + public int id; + public String name; + public int category; + + public Customer() { + } + + public Customer(int id, String name, int category) { + this.id = id; + this.name = name; + this.category = category; + } +} diff --git a/integration-tests/kafka/src/main/java/io/quarkus/it/kafka/streams/EnrichedCustomer.java b/integration-tests/kafka/src/main/java/io/quarkus/it/kafka/streams/EnrichedCustomer.java new file mode 100644 index 00000000000000..826d344e2d7bfc --- /dev/null +++ b/integration-tests/kafka/src/main/java/io/quarkus/it/kafka/streams/EnrichedCustomer.java @@ -0,0 +1,20 @@ +package io.quarkus.it.kafka.streams; + +import io.quarkus.runtime.annotations.RegisterForReflection; + +@RegisterForReflection +public class EnrichedCustomer { + + public int id; + public String name; + public Category category; + + public EnrichedCustomer() { + } + + public EnrichedCustomer(int id, String name, Category category) { + this.id = id; + this.name = name; + this.category = category; + } +} diff --git a/integration-tests/kafka/src/main/java/io/quarkus/it/kafka/KafkaStreamsPipeline.java b/integration-tests/kafka/src/main/java/io/quarkus/it/kafka/streams/KafkaStreamsPipeline.java similarity index 82% rename from integration-tests/kafka/src/main/java/io/quarkus/it/kafka/KafkaStreamsPipeline.java rename to integration-tests/kafka/src/main/java/io/quarkus/it/kafka/streams/KafkaStreamsPipeline.java index d7cebe8a7312f9..561887f80f5675 100644 --- a/integration-tests/kafka/src/main/java/io/quarkus/it/kafka/KafkaStreamsPipeline.java +++ b/integration-tests/kafka/src/main/java/io/quarkus/it/kafka/streams/KafkaStreamsPipeline.java @@ -1,4 +1,4 @@ -package io.quarkus.it.kafka; +package io.quarkus.it.kafka.streams; import static org.awaitility.Awaitility.await; @@ -14,9 +14,6 @@ import javax.enterprise.context.ApplicationScoped; import javax.enterprise.event.Observes; -import javax.json.Json; -import javax.json.JsonObject; -import javax.json.JsonObjectBuilder; import javax.ws.rs.Consumes; import javax.ws.rs.GET; import javax.ws.rs.POST; @@ -45,6 +42,7 @@ import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; import org.apache.kafka.streams.state.Stores; +import io.quarkus.kafka.client.serialization.JsonbSerde; import io.quarkus.runtime.StartupEvent; @ApplicationScoped @@ -68,30 +66,30 @@ void onStart(@Observes StartupEvent ev) { props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); StreamsBuilder builder = new StreamsBuilder(); - JsonObjectSerde jsonNodeSerde = new JsonObjectSerde(); - KTable categories = builder.table( + JsonbSerde categorySerde = new JsonbSerde<>(Category.class); + JsonbSerde customerSerde = new JsonbSerde<>(Customer.class); + JsonbSerde enrichedCustomerSerde = new JsonbSerde<>(EnrichedCustomer.class); + + KTable categories = builder.table( CATEGORIES_TOPIC_NAME, - Consumed.with(Serdes.Integer(), jsonNodeSerde)); + Consumed.with(Serdes.Integer(), categorySerde)); - KStream customers = builder - .stream(CUSTOMERS_TOPIC_NAME, Consumed.with(Serdes.Integer(), jsonNodeSerde)) - .selectKey((k, v) -> v.getJsonNumber("category").intValue()) + KStream customers = builder + .stream(CUSTOMERS_TOPIC_NAME, Consumed.with(Serdes.Integer(), customerSerde)) + .selectKey((id, customer) -> customer.category) .join( categories, - (v1, v2) -> { - JsonObjectBuilder target = Json.createObjectBuilder(); - v1.forEach(target::add); - target.add("category", v2); - return target.build(); + (customer, category) -> { + return new EnrichedCustomer(customer.id, customer.name, category); }, - Joined.with(Serdes.Integer(), jsonNodeSerde, null)); + Joined.with(Serdes.Integer(), customerSerde, categorySerde)); KeyValueBytesStoreSupplier storeSupplier = Stores.inMemoryKeyValueStore("countstore"); customers.groupByKey() .count(Materialized. as(storeSupplier)); - customers.selectKey((k, v) -> v.getJsonNumber("id").intValue()) - .to("streams-test-customers-processed", Produced.with(Serdes.Integer(), jsonNodeSerde)); + customers.selectKey((categoryId, customer) -> customer.id) + .to("streams-test-customers-processed", Produced.with(Serdes.Integer(), enrichedCustomerSerde)); streams = new KafkaStreams(builder.build(), props); diff --git a/integration-tests/kafka/src/test/java/io/quarkus/it/kafka/KafkaStreamsITCase.java b/integration-tests/kafka/src/test/java/io/quarkus/it/kafka/streams/KafkaStreamsITCase.java similarity index 76% rename from integration-tests/kafka/src/test/java/io/quarkus/it/kafka/KafkaStreamsITCase.java rename to integration-tests/kafka/src/test/java/io/quarkus/it/kafka/streams/KafkaStreamsITCase.java index 2410b099dc1bda..874db3541e6603 100644 --- a/integration-tests/kafka/src/test/java/io/quarkus/it/kafka/KafkaStreamsITCase.java +++ b/integration-tests/kafka/src/test/java/io/quarkus/it/kafka/streams/KafkaStreamsITCase.java @@ -1,4 +1,4 @@ -package io.quarkus.it.kafka; +package io.quarkus.it.kafka.streams; import io.quarkus.test.junit.SubstrateTest; diff --git a/integration-tests/kafka/src/test/java/io/quarkus/it/kafka/KafkaStreamsTest.java b/integration-tests/kafka/src/test/java/io/quarkus/it/kafka/streams/KafkaStreamsTest.java similarity index 54% rename from integration-tests/kafka/src/test/java/io/quarkus/it/kafka/KafkaStreamsTest.java rename to integration-tests/kafka/src/test/java/io/quarkus/it/kafka/streams/KafkaStreamsTest.java index 17e0a3afe8230b..28cfba7027dd0f 100644 --- a/integration-tests/kafka/src/test/java/io/quarkus/it/kafka/KafkaStreamsTest.java +++ b/integration-tests/kafka/src/test/java/io/quarkus/it/kafka/streams/KafkaStreamsTest.java @@ -1,16 +1,11 @@ -package io.quarkus.it.kafka; +package io.quarkus.it.kafka.streams; -import java.io.StringReader; import java.time.Duration; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Properties; -import javax.json.Json; -import javax.json.JsonObject; -import javax.json.JsonReader; - import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -22,11 +17,12 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.IntegerDeserializer; import org.apache.kafka.common.serialization.IntegerSerializer; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.common.serialization.StringSerializer; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import io.quarkus.it.kafka.KafkaTestResource; +import io.quarkus.kafka.client.serialization.JsonbDeserializer; +import io.quarkus.kafka.client.serialization.JsonbSerializer; import io.quarkus.test.common.QuarkusTestResource; import io.quarkus.test.junit.QuarkusTest; import io.restassured.RestAssured; @@ -35,26 +31,36 @@ @QuarkusTest public class KafkaStreamsTest { - private static Producer createProducer() { + private static Producer createProducer() { + Properties props = new Properties(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19092"); + props.put(ProducerConfig.CLIENT_ID_CONFIG, "streams-test-producer"); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName()); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonbSerializer.class.getName()); + + return new KafkaProducer(props); + } + + private static Producer createCategoryProducer() { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19092"); props.put(ProducerConfig.CLIENT_ID_CONFIG, "streams-test-producer"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName()); - props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonbSerializer.class.getName()); - return new KafkaProducer(props); + return new KafkaProducer<>(props); } - private static KafkaConsumer createConsumer() { + private static KafkaConsumer createConsumer() { Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19092"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "streams-test-consumer"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getName()); - props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, EnrichedCustomerDeserializer.class.getName()); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - KafkaConsumer consumer = new KafkaConsumer<>(props); + KafkaConsumer consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singletonList("streams-test-customers-processed")); return consumer; } @@ -63,40 +69,40 @@ private static KafkaConsumer createConsumer() { public void testKafkaStreams() throws Exception { produceCustomers(); - Consumer consumer = createConsumer(); - List> records = poll(consumer, 4); + Consumer consumer = createConsumer(); + List> records = poll(consumer, 4); - ConsumerRecord record = records.get(0); + ConsumerRecord record = records.get(0); Assertions.assertEquals(101, record.key()); - JsonObject customer = parse(record.value()); - Assertions.assertEquals(101, customer.getInt("id")); - Assertions.assertEquals("Bob", customer.getString("name")); - Assertions.assertEquals("B2B", customer.getJsonObject("category").getString("name")); - Assertions.assertEquals("business-to-business", customer.getJsonObject("category").getString("value")); + EnrichedCustomer customer = record.value(); + Assertions.assertEquals(101, customer.id); + Assertions.assertEquals("Bob", customer.name); + Assertions.assertEquals("B2B", customer.category.name); + Assertions.assertEquals("business-to-business", customer.category.value); record = records.get(1); Assertions.assertEquals(102, record.key()); - customer = parse(record.value()); - Assertions.assertEquals(102, customer.getInt("id")); - Assertions.assertEquals("Becky", customer.getString("name")); - Assertions.assertEquals("B2C", customer.getJsonObject("category").getString("name")); - Assertions.assertEquals("business-to-customer", customer.getJsonObject("category").getString("value")); + customer = record.value(); + Assertions.assertEquals(102, customer.id); + Assertions.assertEquals("Becky", customer.name); + Assertions.assertEquals("B2C", customer.category.name); + Assertions.assertEquals("business-to-customer", customer.category.value); record = records.get(2); Assertions.assertEquals(103, record.key()); - customer = parse(record.value()); - Assertions.assertEquals(103, customer.getInt("id")); - Assertions.assertEquals("Bruce", customer.getString("name")); - Assertions.assertEquals("B2B", customer.getJsonObject("category").getString("name")); - Assertions.assertEquals("business-to-business", customer.getJsonObject("category").getString("value")); + customer = record.value(); + Assertions.assertEquals(103, customer.id); + Assertions.assertEquals("Bruce", customer.name); + Assertions.assertEquals("B2B", customer.category.name); + Assertions.assertEquals("business-to-business", customer.category.value); record = records.get(3); Assertions.assertEquals(104, record.key()); - customer = parse(record.value()); - Assertions.assertEquals(104, customer.getInt("id")); - Assertions.assertEquals("Bert", customer.getString("name")); - Assertions.assertEquals("B2B", customer.getJsonObject("category").getString("name")); - Assertions.assertEquals("business-to-business", customer.getJsonObject("category").getString("value")); + customer = record.value(); + Assertions.assertEquals(104, customer.id); + Assertions.assertEquals("Bert", customer.name); + Assertions.assertEquals("B2B", customer.category.name); + Assertions.assertEquals("business-to-business", customer.category.value); assertCategoryCount(1, 3); assertCategoryCount(2, 1); @@ -107,21 +113,17 @@ record = records.get(3); } private void produceCustomers() { - Producer producer = createProducer(); - - producer.send(new ProducerRecord<>("streams-test-categories", 1, - "{ \"name\" : \"B2B\", \"value\" : \"business-to-business\" }")); - producer.send(new ProducerRecord<>("streams-test-categories", 2, - "{ \"name\" : \"B2C\", \"value\" : \"business-to-customer\" }")); - - producer.send( - new ProducerRecord<>("streams-test-customers", 101, "{ \"id\" : 101, \"name\" : \"Bob\", \"category\" : 1 }")); - producer.send(new ProducerRecord<>("streams-test-customers", 102, - "{ \"id\" : 102, \"name\" : \"Becky\", \"category\" : 2 }")); - producer.send(new ProducerRecord<>("streams-test-customers", 103, - "{ \"id\" : 103, \"name\" : \"Bruce\", \"category\" : 1 }")); - producer.send(new ProducerRecord<>("streams-test-customers", 104, - "{ \"id\" : 104, \"name\" : \"Bert\", \"category\" : 1 }")); + Producer producer = createProducer(); + + Producer categoryProducer = createCategoryProducer(); + + categoryProducer.send(new ProducerRecord<>("streams-test-categories", 1, new Category("B2B", "business-to-business"))); + categoryProducer.send(new ProducerRecord<>("streams-test-categories", 2, new Category("B2C", "business-to-customer"))); + + producer.send(new ProducerRecord<>("streams-test-customers", 101, new Customer(101, "Bob", 1))); + producer.send(new ProducerRecord<>("streams-test-customers", 102, new Customer(102, "Becky", 2))); + producer.send(new ProducerRecord<>("streams-test-customers", 103, new Customer(103, "Bruce", 1))); + producer.send(new ProducerRecord<>("streams-test-customers", 104, new Customer(104, "Bert", 1))); } private void assertCategoryCount(int categoryId, int expectedCount) throws Exception { @@ -146,11 +148,12 @@ private Integer getCategoryCount(int categoryId) { return null; } - private List> poll(Consumer consumer, int expectedRecordCount) { + private List> poll(Consumer consumer, + int expectedRecordCount) { int fetched = 0; - List> result = new ArrayList<>(); + List> result = new ArrayList<>(); while (fetched < expectedRecordCount) { - ConsumerRecords records = consumer.poll(Duration.ofMillis(20000)); + ConsumerRecords records = consumer.poll(Duration.ofMillis(20000)); records.forEach(result::add); fetched = result.size(); } @@ -158,9 +161,10 @@ private List> poll(Consumer con return result; } - private JsonObject parse(String json) { - try (JsonReader reader = Json.createReader(new StringReader(json))) { - return reader.readObject(); + public static class EnrichedCustomerDeserializer extends JsonbDeserializer { + + public EnrichedCustomerDeserializer() { + super(EnrichedCustomer.class); } } }