Skip to content

Commit

Permalink
#2663 Adding test for Kafka Streams interactive queries
Browse files Browse the repository at this point in the history
  • Loading branch information
gunnarmorling committed Jun 3, 2019
1 parent 7d7650b commit 5d882d9
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,13 @@
import javax.json.Json;
import javax.json.JsonObject;
import javax.json.JsonObjectBuilder;
import javax.ws.rs.Consumes;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.AdminClient;
Expand All @@ -23,10 +28,17 @@
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Joined;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.streams.state.Stores;

import io.quarkus.runtime.StartupEvent;

Expand All @@ -51,10 +63,10 @@ void onStart(@Observes StartupEvent ev) {
JsonObjectSerde jsonNodeSerde = new JsonObjectSerde();
KTable<Integer, JsonObject> categories = builder.table(
"streams-test-categories",
Consumed.with(Serdes.Integer(), jsonNodeSerde)
);
Consumed.with(Serdes.Integer(), jsonNodeSerde));

builder.stream("streams-test-customers", Consumed.with(Serdes.Integer(), jsonNodeSerde))
KStream<Integer, JsonObject> customers = builder
.stream("streams-test-customers", Consumed.with(Serdes.Integer(), jsonNodeSerde))
.selectKey((k, v) -> v.getJsonNumber("category").intValue())
.join(
categories,
Expand All @@ -64,9 +76,13 @@ void onStart(@Observes StartupEvent ev) {
target.add("category", v2);
return target.build();
},
Joined.with(Serdes.Integer(), jsonNodeSerde, null)
)
.selectKey((k, v) -> v.getJsonNumber("id").intValue())
Joined.with(Serdes.Integer(), jsonNodeSerde, null));

KeyValueBytesStoreSupplier storeSupplier = Stores.inMemoryKeyValueStore("countstore");
customers.groupByKey()
.count(Materialized.<Integer, Long> as(storeSupplier));

customers.selectKey((k, v) -> v.getJsonNumber("id").intValue())
.to("streams-test-customers-processed", Produced.with(Serdes.Integer(), jsonNodeSerde));

streams = new KafkaStreams(builder.build(), props);
Expand All @@ -78,12 +94,30 @@ void onStart(@Observes StartupEvent ev) {
});
}

private ReadOnlyKeyValueStore<Integer, Long> getCountstore() {
while (true) {
try {
return streams.store("countstore", QueryableStoreTypes.keyValueStore());
} catch (InvalidStateStoreException e) {
// ignore, store not ready yet
}
}
}

@POST
@Path("/stop")
public void stop() {
streams.close();
}

@GET
@Path("/category/{id}")
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
public Long getCategory(@PathParam("id") int id) {
return getCountstore().get(id);
}

private void waitForTopicsToBeCreated(String bootstrapServers) {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -55,31 +54,12 @@ private static KafkaConsumer<Integer, String> createConsumer() {
return consumer;
}

@AfterEach
public void stopKafkaStreams() {
// explicitly stopping the pipeline *before* the broker is shut down, as it
// otherwise will time out
RestAssured.post("/kafkastreams/stop");
}

@Test
public void testKafkaStreams() throws Exception {
Producer<Integer, String> producer = createProducer();

producer.send(new ProducerRecord<>("streams-test-categories", 1,
"{ \"name\" : \"B2B\", \"value\" : \"business-to-business\" }"));
producer.send(new ProducerRecord<>("streams-test-categories", 2,
"{ \"name\" : \"B2C\", \"value\" : \"business-to-customer\" }"));

producer.send(
new ProducerRecord<>("streams-test-customers", 101, "{ \"id\" : 101, \"name\" : \"Bob\", \"category\" : 1 }"));
producer.send(new ProducerRecord<>("streams-test-customers", 102,
"{ \"id\" : 102, \"name\" : \"Becky\", \"category\" : 2 }"));
producer.send(new ProducerRecord<>("streams-test-customers", 103,
"{ \"id\" : 103, \"name\" : \"Bruce\", \"category\" : 1 }"));
produceCustomers();

Consumer<Integer, String> consumer = createConsumer();
List<ConsumerRecord<Integer, String>> records = poll(consumer, 3);
List<ConsumerRecord<Integer, String>> records = poll(consumer, 4);

ConsumerRecord<Integer, String> record = records.get(0);
Assertions.assertEquals(101, record.key());
Expand All @@ -98,6 +78,59 @@ record = records.get(2);
Assertions.assertEquals(
"{\"id\":103,\"name\":\"Bruce\",\"category\":{\"name\":\"B2B\",\"value\":\"business-to-business\"}}",
record.value());

record = records.get(3);
Assertions.assertEquals(104, record.key());
Assertions.assertEquals(
"{\"id\":104,\"name\":\"Bert\",\"category\":{\"name\":\"B2B\",\"value\":\"business-to-business\"}}",
record.value());

assertCategoryCount(1, 3);
assertCategoryCount(2, 1);

// explicitly stopping the pipeline *before* the broker is shut down, as it
// otherwise will time out
RestAssured.post("/kafkastreams/stop");
}

private void produceCustomers() {
Producer<Integer, String> producer = createProducer();

producer.send(new ProducerRecord<>("streams-test-categories", 1,
"{ \"name\" : \"B2B\", \"value\" : \"business-to-business\" }"));
producer.send(new ProducerRecord<>("streams-test-categories", 2,
"{ \"name\" : \"B2C\", \"value\" : \"business-to-customer\" }"));

producer.send(
new ProducerRecord<>("streams-test-customers", 101, "{ \"id\" : 101, \"name\" : \"Bob\", \"category\" : 1 }"));
producer.send(new ProducerRecord<>("streams-test-customers", 102,
"{ \"id\" : 102, \"name\" : \"Becky\", \"category\" : 2 }"));
producer.send(new ProducerRecord<>("streams-test-customers", 103,
"{ \"id\" : 103, \"name\" : \"Bruce\", \"category\" : 1 }"));
producer.send(new ProducerRecord<>("streams-test-customers", 104,
"{ \"id\" : 104, \"name\" : \"Bert\", \"category\" : 1 }"));
}

private void assertCategoryCount(int categoryId, int expectedCount) throws Exception {
int i = 0;
Integer actual = null;

// retrying for some time as the aggregation might not have finished yet
while (i < 50 && !Integer.valueOf(expectedCount).equals(actual)) {
actual = getCategoryCount(categoryId);
Thread.sleep(100);
}

Assertions.assertEquals(expectedCount, actual);
}

private Integer getCategoryCount(int categoryId) {
String result = RestAssured.when().get("/kafkastreams/category/" + categoryId).asString();
if (result != null && !result.trim().isEmpty()) {
return Integer.valueOf(result);
}

return null;
}

private List<ConsumerRecord<Integer, String>> poll(Consumer<Integer, String> consumer, int expectedRecordCount) {
Expand All @@ -111,5 +144,4 @@ private List<ConsumerRecord<Integer, String>> poll(Consumer<Integer, String> con

return result;
}

}

0 comments on commit 5d882d9

Please sign in to comment.