diff --git a/integration-tests/kafka/pom.xml b/integration-tests/kafka/pom.xml
index 011d8f6e5b0c4..6eb67162f84d5 100644
--- a/integration-tests/kafka/pom.xml
+++ b/integration-tests/kafka/pom.xml
@@ -43,6 +43,10 @@
io.quarkus
quarkus-integration-test-shared-library
+
+ org.awaitility
+ awaitility
+
diff --git a/integration-tests/kafka/src/main/java/io/quarkus/it/kafka/KafkaStreamsPipeline.java b/integration-tests/kafka/src/main/java/io/quarkus/it/kafka/KafkaStreamsPipeline.java
index a67ebe9c84971..83c156158c111 100644
--- a/integration-tests/kafka/src/main/java/io/quarkus/it/kafka/KafkaStreamsPipeline.java
+++ b/integration-tests/kafka/src/main/java/io/quarkus/it/kafka/KafkaStreamsPipeline.java
@@ -1,11 +1,16 @@
package io.quarkus.it.kafka;
+import static org.awaitility.Awaitility.await;
+
+import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.TimeUnit;
import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.event.Observes;
@@ -41,11 +46,13 @@
import org.apache.kafka.streams.state.Stores;
import io.quarkus.runtime.StartupEvent;
-
@ApplicationScoped
@Path("/kafkastreams")
public class KafkaStreamsPipeline {
+ private static final String CATEGORIES_TOPIC_NAME = "streams-test-categories";
+ private static final String CUSTOMERS_TOPIC_NAME = "streams-test-customers";
+
private KafkaStreams streams;
private ExecutorService executor;
@@ -62,11 +69,11 @@ void onStart(@Observes StartupEvent ev) {
JsonObjectSerde jsonNodeSerde = new JsonObjectSerde();
KTable categories = builder.table(
- "streams-test-categories",
+ CATEGORIES_TOPIC_NAME,
Consumed.with(Serdes.Integer(), jsonNodeSerde));
KStream customers = builder
- .stream("streams-test-customers", Consumed.with(Serdes.Integer(), jsonNodeSerde))
+ .stream(CUSTOMERS_TOPIC_NAME, Consumed.with(Serdes.Integer(), jsonNodeSerde))
.selectKey((k, v) -> v.getJsonNumber("category").intValue())
.join(
categories,
@@ -123,24 +130,20 @@ private void waitForTopicsToBeCreated(String bootstrapServers) {
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
try (AdminClient adminClient = AdminClient.create(config)) {
- AtomicBoolean topicsCreated = new AtomicBoolean(false);
+ await().until(topicsCreated(adminClient, CATEGORIES_TOPIC_NAME, CUSTOMERS_TOPIC_NAME));
+ }
+ }
- while (topicsCreated.get() == false) {
+ private Callable topicsCreated(AdminClient adminClient, String... expectedTopics) {
+ return new Callable() {
+
+ @Override
+ public Boolean call() throws Exception {
ListTopicsResult topics = adminClient.listTopics();
- topics.names().whenComplete((t, e) -> {
- if (e != null) {
- throw new RuntimeException(e);
- } else if (t.contains("streams-test-categories") && t.contains("streams-test-customers")) {
- topicsCreated.set(true);
- }
- });
-
- try {
- Thread.sleep(50);
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
+ Set topicNames = topics.names().get(10, TimeUnit.SECONDS);
+
+ return topicNames.containsAll(Arrays.asList(expectedTopics));
}
- }
- }
+ };
+ }
}
diff --git a/integration-tests/kafka/src/test/java/io/quarkus/it/main/KafkaStreamsTest.java b/integration-tests/kafka/src/test/java/io/quarkus/it/main/KafkaStreamsTest.java
index 6afe7bb469298..72c7e90f3764e 100644
--- a/integration-tests/kafka/src/test/java/io/quarkus/it/main/KafkaStreamsTest.java
+++ b/integration-tests/kafka/src/test/java/io/quarkus/it/main/KafkaStreamsTest.java
@@ -1,11 +1,16 @@
package io.quarkus.it.main;
+import java.io.StringReader;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
+import javax.json.Json;
+import javax.json.JsonObject;
+import javax.json.JsonReader;
+
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -63,27 +68,35 @@ public void testKafkaStreams() throws Exception {
ConsumerRecord record = records.get(0);
Assertions.assertEquals(101, record.key());
- Assertions.assertEquals(
- "{\"id\":101,\"name\":\"Bob\",\"category\":{\"name\":\"B2B\",\"value\":\"business-to-business\"}}",
- record.value());
+ JsonObject customer = parse(record.value());
+ Assertions.assertEquals(101, customer.getInt("id"));
+ Assertions.assertEquals("Bob", customer.getString("name"));
+ Assertions.assertEquals("B2B", customer.getJsonObject("category").getString("name"));
+ Assertions.assertEquals("business-to-business", customer.getJsonObject("category").getString("value"));
record = records.get(1);
Assertions.assertEquals(102, record.key());
- Assertions.assertEquals(
- "{\"id\":102,\"name\":\"Becky\",\"category\":{\"name\":\"B2C\",\"value\":\"business-to-customer\"}}",
- record.value());
+ customer = parse(record.value());
+ Assertions.assertEquals(102, customer.getInt("id"));
+ Assertions.assertEquals("Becky", customer.getString("name"));
+ Assertions.assertEquals("B2C", customer.getJsonObject("category").getString("name"));
+ Assertions.assertEquals("business-to-customer", customer.getJsonObject("category").getString("value"));
record = records.get(2);
Assertions.assertEquals(103, record.key());
- Assertions.assertEquals(
- "{\"id\":103,\"name\":\"Bruce\",\"category\":{\"name\":\"B2B\",\"value\":\"business-to-business\"}}",
- record.value());
+ customer = parse(record.value());
+ Assertions.assertEquals(103, customer.getInt("id"));
+ Assertions.assertEquals("Bruce", customer.getString("name"));
+ Assertions.assertEquals("B2B", customer.getJsonObject("category").getString("name"));
+ Assertions.assertEquals("business-to-business", customer.getJsonObject("category").getString("value"));
record = records.get(3);
Assertions.assertEquals(104, record.key());
- Assertions.assertEquals(
- "{\"id\":104,\"name\":\"Bert\",\"category\":{\"name\":\"B2B\",\"value\":\"business-to-business\"}}",
- record.value());
+ customer = parse(record.value());
+ Assertions.assertEquals(104, customer.getInt("id"));
+ Assertions.assertEquals("Bert", customer.getString("name"));
+ Assertions.assertEquals("B2B", customer.getJsonObject("category").getString("name"));
+ Assertions.assertEquals("business-to-business", customer.getJsonObject("category").getString("value"));
assertCategoryCount(1, 3);
assertCategoryCount(2, 1);
@@ -144,4 +157,10 @@ private List> poll(Consumer con
return result;
}
+
+ private JsonObject parse(String json) {
+ try(JsonReader reader = Json.createReader(new StringReader(json))) {
+ return reader.readObject();
+ }
+ }
}