diff --git a/integration-tests/kafka/src/main/java/io/quarkus/it/kafka/KafkaStreamsPipeline.java b/integration-tests/kafka/src/main/java/io/quarkus/it/kafka/KafkaStreamsPipeline.java index 5d94a5472d316..a67ebe9c84971 100644 --- a/integration-tests/kafka/src/main/java/io/quarkus/it/kafka/KafkaStreamsPipeline.java +++ b/integration-tests/kafka/src/main/java/io/quarkus/it/kafka/KafkaStreamsPipeline.java @@ -12,8 +12,13 @@ import javax.json.Json; import javax.json.JsonObject; import javax.json.JsonObjectBuilder; +import javax.ws.rs.Consumes; +import javax.ws.rs.GET; import javax.ws.rs.POST; import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.core.MediaType; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.admin.AdminClient; @@ -23,10 +28,17 @@ import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.errors.InvalidStateStoreException; import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.Joined; +import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Produced; +import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; +import org.apache.kafka.streams.state.QueryableStoreTypes; +import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; +import org.apache.kafka.streams.state.Stores; import io.quarkus.runtime.StartupEvent; @@ -51,10 +63,10 @@ void onStart(@Observes StartupEvent ev) { JsonObjectSerde jsonNodeSerde = new JsonObjectSerde(); KTable categories = builder.table( "streams-test-categories", - Consumed.with(Serdes.Integer(), jsonNodeSerde) - ); + Consumed.with(Serdes.Integer(), jsonNodeSerde)); - builder.stream("streams-test-customers", Consumed.with(Serdes.Integer(), jsonNodeSerde)) + KStream customers = builder + .stream("streams-test-customers", Consumed.with(Serdes.Integer(), jsonNodeSerde)) .selectKey((k, v) -> v.getJsonNumber("category").intValue()) .join( categories, @@ -64,9 +76,13 @@ void onStart(@Observes StartupEvent ev) { target.add("category", v2); return target.build(); }, - Joined.with(Serdes.Integer(), jsonNodeSerde, null) - ) - .selectKey((k, v) -> v.getJsonNumber("id").intValue()) + Joined.with(Serdes.Integer(), jsonNodeSerde, null)); + + KeyValueBytesStoreSupplier storeSupplier = Stores.inMemoryKeyValueStore("countstore"); + customers.groupByKey() + .count(Materialized. as(storeSupplier)); + + customers.selectKey((k, v) -> v.getJsonNumber("id").intValue()) .to("streams-test-customers-processed", Produced.with(Serdes.Integer(), jsonNodeSerde)); streams = new KafkaStreams(builder.build(), props); @@ -78,12 +94,30 @@ void onStart(@Observes StartupEvent ev) { }); } + private ReadOnlyKeyValueStore getCountstore() { + while (true) { + try { + return streams.store("countstore", QueryableStoreTypes.keyValueStore()); + } catch (InvalidStateStoreException e) { + // ignore, store not ready yet + } + } + } + @POST @Path("/stop") public void stop() { streams.close(); } + @GET + @Path("/category/{id}") + @Consumes(MediaType.APPLICATION_JSON) + @Produces(MediaType.APPLICATION_JSON) + public Long getCategory(@PathParam("id") int id) { + return getCountstore().get(id); + } + private void waitForTopicsToBeCreated(String bootstrapServers) { Map config = new HashMap<>(); config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); diff --git a/integration-tests/kafka/src/test/java/io/quarkus/it/main/KafkaStreamsTest.java b/integration-tests/kafka/src/test/java/io/quarkus/it/main/KafkaStreamsTest.java index 9b30c625c7873..6afe7bb469298 100644 --- a/integration-tests/kafka/src/test/java/io/quarkus/it/main/KafkaStreamsTest.java +++ b/integration-tests/kafka/src/test/java/io/quarkus/it/main/KafkaStreamsTest.java @@ -19,7 +19,6 @@ import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; -import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -55,31 +54,12 @@ private static KafkaConsumer createConsumer() { return consumer; } - @AfterEach - public void stopKafkaStreams() { - // explicitly stopping the pipeline *before* the broker is shut down, as it - // otherwise will time out - RestAssured.post("/kafkastreams/stop"); - } - @Test public void testKafkaStreams() throws Exception { - Producer producer = createProducer(); - - producer.send(new ProducerRecord<>("streams-test-categories", 1, - "{ \"name\" : \"B2B\", \"value\" : \"business-to-business\" }")); - producer.send(new ProducerRecord<>("streams-test-categories", 2, - "{ \"name\" : \"B2C\", \"value\" : \"business-to-customer\" }")); - - producer.send( - new ProducerRecord<>("streams-test-customers", 101, "{ \"id\" : 101, \"name\" : \"Bob\", \"category\" : 1 }")); - producer.send(new ProducerRecord<>("streams-test-customers", 102, - "{ \"id\" : 102, \"name\" : \"Becky\", \"category\" : 2 }")); - producer.send(new ProducerRecord<>("streams-test-customers", 103, - "{ \"id\" : 103, \"name\" : \"Bruce\", \"category\" : 1 }")); + produceCustomers(); Consumer consumer = createConsumer(); - List> records = poll(consumer, 3); + List> records = poll(consumer, 4); ConsumerRecord record = records.get(0); Assertions.assertEquals(101, record.key()); @@ -98,6 +78,59 @@ record = records.get(2); Assertions.assertEquals( "{\"id\":103,\"name\":\"Bruce\",\"category\":{\"name\":\"B2B\",\"value\":\"business-to-business\"}}", record.value()); + + record = records.get(3); + Assertions.assertEquals(104, record.key()); + Assertions.assertEquals( + "{\"id\":104,\"name\":\"Bert\",\"category\":{\"name\":\"B2B\",\"value\":\"business-to-business\"}}", + record.value()); + + assertCategoryCount(1, 3); + assertCategoryCount(2, 1); + + // explicitly stopping the pipeline *before* the broker is shut down, as it + // otherwise will time out + RestAssured.post("/kafkastreams/stop"); + } + + private void produceCustomers() { + Producer producer = createProducer(); + + producer.send(new ProducerRecord<>("streams-test-categories", 1, + "{ \"name\" : \"B2B\", \"value\" : \"business-to-business\" }")); + producer.send(new ProducerRecord<>("streams-test-categories", 2, + "{ \"name\" : \"B2C\", \"value\" : \"business-to-customer\" }")); + + producer.send( + new ProducerRecord<>("streams-test-customers", 101, "{ \"id\" : 101, \"name\" : \"Bob\", \"category\" : 1 }")); + producer.send(new ProducerRecord<>("streams-test-customers", 102, + "{ \"id\" : 102, \"name\" : \"Becky\", \"category\" : 2 }")); + producer.send(new ProducerRecord<>("streams-test-customers", 103, + "{ \"id\" : 103, \"name\" : \"Bruce\", \"category\" : 1 }")); + producer.send(new ProducerRecord<>("streams-test-customers", 104, + "{ \"id\" : 104, \"name\" : \"Bert\", \"category\" : 1 }")); + } + + private void assertCategoryCount(int categoryId, int expectedCount) throws Exception { + int i = 0; + Integer actual = null; + + // retrying for some time as the aggregation might not have finished yet + while (i < 50 && !Integer.valueOf(expectedCount).equals(actual)) { + actual = getCategoryCount(categoryId); + Thread.sleep(100); + } + + Assertions.assertEquals(expectedCount, actual); + } + + private Integer getCategoryCount(int categoryId) { + String result = RestAssured.when().get("/kafkastreams/category/" + categoryId).asString(); + if (result != null && !result.trim().isEmpty()) { + return Integer.valueOf(result); + } + + return null; } private List> poll(Consumer consumer, int expectedRecordCount) { @@ -111,5 +144,4 @@ private List> poll(Consumer con return result; } - }