Skip to content

Commit

Permalink
#2663 Cleaning up tests
Browse files Browse the repository at this point in the history
  • Loading branch information
gunnarmorling committed Jun 4, 2019
1 parent 41d278e commit 78e6452
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 33 deletions.
4 changes: 4 additions & 0 deletions integration-tests/kafka/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@
<groupId>io.quarkus</groupId>
<artifactId>quarkus-integration-test-shared-library</artifactId>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
</dependency>

<!-- JAX-RS -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
package io.quarkus.it.kafka;

import static org.awaitility.Awaitility.await;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.TimeUnit;

import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.event.Observes;
Expand Down Expand Up @@ -41,11 +46,13 @@
import org.apache.kafka.streams.state.Stores;

import io.quarkus.runtime.StartupEvent;

@ApplicationScoped
@Path("/kafkastreams")
public class KafkaStreamsPipeline {

private static final String CATEGORIES_TOPIC_NAME = "streams-test-categories";
private static final String CUSTOMERS_TOPIC_NAME = "streams-test-customers";

private KafkaStreams streams;

private ExecutorService executor;
Expand All @@ -62,11 +69,11 @@ void onStart(@Observes StartupEvent ev) {

JsonObjectSerde jsonNodeSerde = new JsonObjectSerde();
KTable<Integer, JsonObject> categories = builder.table(
"streams-test-categories",
CATEGORIES_TOPIC_NAME,
Consumed.with(Serdes.Integer(), jsonNodeSerde));

KStream<Integer, JsonObject> customers = builder
.stream("streams-test-customers", Consumed.with(Serdes.Integer(), jsonNodeSerde))
.stream(CUSTOMERS_TOPIC_NAME, Consumed.with(Serdes.Integer(), jsonNodeSerde))
.selectKey((k, v) -> v.getJsonNumber("category").intValue())
.join(
categories,
Expand Down Expand Up @@ -123,24 +130,20 @@ private void waitForTopicsToBeCreated(String bootstrapServers) {
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);

try (AdminClient adminClient = AdminClient.create(config)) {
AtomicBoolean topicsCreated = new AtomicBoolean(false);
await().until(topicsCreated(adminClient, CATEGORIES_TOPIC_NAME, CUSTOMERS_TOPIC_NAME));
}
}

while (topicsCreated.get() == false) {
private Callable<Boolean> topicsCreated(AdminClient adminClient, String... expectedTopics) {
return new Callable<Boolean>() {

@Override
public Boolean call() throws Exception {
ListTopicsResult topics = adminClient.listTopics();
topics.names().whenComplete((t, e) -> {
if (e != null) {
throw new RuntimeException(e);
} else if (t.contains("streams-test-categories") && t.contains("streams-test-customers")) {
topicsCreated.set(true);
}
});

try {
Thread.sleep(50);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
Set<String> topicNames = topics.names().get(10, TimeUnit.SECONDS);

return topicNames.containsAll(Arrays.asList(expectedTopics));
}
}
}
};
}
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
package io.quarkus.it.main;

import java.io.StringReader;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;

import javax.json.Json;
import javax.json.JsonObject;
import javax.json.JsonReader;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
Expand Down Expand Up @@ -63,27 +68,35 @@ public void testKafkaStreams() throws Exception {

ConsumerRecord<Integer, String> record = records.get(0);
Assertions.assertEquals(101, record.key());
Assertions.assertEquals(
"{\"id\":101,\"name\":\"Bob\",\"category\":{\"name\":\"B2B\",\"value\":\"business-to-business\"}}",
record.value());
JsonObject customer = parse(record.value());
Assertions.assertEquals(101, customer.getInt("id"));
Assertions.assertEquals("Bob", customer.getString("name"));
Assertions.assertEquals("B2B", customer.getJsonObject("category").getString("name"));
Assertions.assertEquals("business-to-business", customer.getJsonObject("category").getString("value"));

record = records.get(1);
Assertions.assertEquals(102, record.key());
Assertions.assertEquals(
"{\"id\":102,\"name\":\"Becky\",\"category\":{\"name\":\"B2C\",\"value\":\"business-to-customer\"}}",
record.value());
customer = parse(record.value());
Assertions.assertEquals(102, customer.getInt("id"));
Assertions.assertEquals("Becky", customer.getString("name"));
Assertions.assertEquals("B2C", customer.getJsonObject("category").getString("name"));
Assertions.assertEquals("business-to-customer", customer.getJsonObject("category").getString("value"));

record = records.get(2);
Assertions.assertEquals(103, record.key());
Assertions.assertEquals(
"{\"id\":103,\"name\":\"Bruce\",\"category\":{\"name\":\"B2B\",\"value\":\"business-to-business\"}}",
record.value());
customer = parse(record.value());
Assertions.assertEquals(103, customer.getInt("id"));
Assertions.assertEquals("Bruce", customer.getString("name"));
Assertions.assertEquals("B2B", customer.getJsonObject("category").getString("name"));
Assertions.assertEquals("business-to-business", customer.getJsonObject("category").getString("value"));

record = records.get(3);
Assertions.assertEquals(104, record.key());
Assertions.assertEquals(
"{\"id\":104,\"name\":\"Bert\",\"category\":{\"name\":\"B2B\",\"value\":\"business-to-business\"}}",
record.value());
customer = parse(record.value());
Assertions.assertEquals(104, customer.getInt("id"));
Assertions.assertEquals("Bert", customer.getString("name"));
Assertions.assertEquals("B2B", customer.getJsonObject("category").getString("name"));
Assertions.assertEquals("business-to-business", customer.getJsonObject("category").getString("value"));

assertCategoryCount(1, 3);
assertCategoryCount(2, 1);
Expand Down Expand Up @@ -144,4 +157,10 @@ private List<ConsumerRecord<Integer, String>> poll(Consumer<Integer, String> con

return result;
}

private JsonObject parse(String json) {
try(JsonReader reader = Json.createReader(new StringReader(json))) {
return reader.readObject();
}
}
}

0 comments on commit 78e6452

Please sign in to comment.