Skip to content

Commit

Permalink
#2663 Adding integration test for Kafka Streams
Browse files Browse the repository at this point in the history
  • Loading branch information
gunnarmorling committed Jun 2, 2019
1 parent 6b676b2 commit cc59c45
Show file tree
Hide file tree
Showing 5 changed files with 326 additions and 1 deletion.
10 changes: 9 additions & 1 deletion integration-tests/kafka/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,20 @@
<groupId>io.quarkus</groupId>
<artifactId>quarkus-resteasy</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>

<!-- Kafka -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-kafka-client</artifactId>
</dependency>

<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-kafka-streams</artifactId>
</dependency>

<!-- test dependencies -->
<dependency>
Expand Down Expand Up @@ -151,6 +158,7 @@
<addAllCharsets>true</addAllCharsets>
<cleanupServer>true</cleanupServer>
<enableHttpUrlHandler>true</enableHttpUrlHandler>
<enableJni>true</enableJni>
<!-- Requires Quarkus Graal fork to work, will fail otherwise
<enableRetainedHeapReporting>true</enableRetainedHeapReporting>
<enableCodeSizeReporting>true</enableCodeSizeReporting>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package io.quarkus.it.kafka;

import java.io.IOException;
import java.util.Map;

import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;

/**
* A {@link Serde} that (de-)serializes JSON.
*/
public class JsonNodeSerde implements Serde<JsonNode> {

private final ObjectMapper mapper;

public JsonNodeSerde() {
mapper = new ObjectMapper();
}

@Override
public void configure(Map<String, ?> configs, boolean isKey) {
}

@Override
public void close() {
}

@Override
public Serializer<JsonNode> serializer() {
return new JsonSerializer();
}

@Override
public Deserializer<JsonNode> deserializer() {
return new JsonDeserializer();
}

private final class JsonDeserializer implements Deserializer<JsonNode> {

@Override
public void configure(Map<String, ?> configs, boolean isKey) {
}

@Override
public JsonNode deserialize(String topic, byte[] data) {
if (data == null) {
return null;
}

try {
return mapper.readTree(data);
} catch (IOException e) {
throw new RuntimeException(e);
}
}

@Override
public void close() {
}
}

private final class JsonSerializer implements Serializer<JsonNode> {

@Override
public void configure(Map<String, ?> configs, boolean isKey) {
}

@Override
public byte[] serialize(String topic, JsonNode data) {
try {
return mapper.writeValueAsBytes(data);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}

@Override
public void close() {
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package io.quarkus.it.kafka;

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;

import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.event.Observes;
import javax.ws.rs.POST;
import javax.ws.rs.Path;

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Joined;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Produced;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;

import io.quarkus.runtime.StartupEvent;

@ApplicationScoped
@Path("/kafkastreams")
public class KafkaStreamsPipeline {

private KafkaStreams streams;

private ExecutorService executor;

void onStart(@Observes StartupEvent ev) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-test-pipeline");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19092");
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 * 1024);
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
props.put(CommonClientConfigs.METADATA_MAX_AGE_CONFIG, 500);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
StreamsBuilder builder = new StreamsBuilder();

JsonNodeSerde jsonNodeSerde = new JsonNodeSerde();

KTable<Integer, JsonNode> categories = builder.table("streams-test-categories",
Consumed.with(Serdes.Integer(), jsonNodeSerde));

builder.stream("streams-test-customers", Consumed.with(Serdes.Integer(), jsonNodeSerde))
.selectKey((k, v) -> v.get("category").asInt())
.join(
categories,
(v1, v2) -> {
((ObjectNode) v1).replace("category", v2);
return v1;
},
Joined.with(Serdes.Integer(), jsonNodeSerde, null))
.selectKey((k, v) -> v.get("id").asInt())
.to("streams-test-customers-processed", Produced.with(Serdes.Integer(), jsonNodeSerde));

streams = new KafkaStreams(builder.build(), props);

executor = Executors.newSingleThreadExecutor();
executor.execute(() -> {
waitForTopicsToBeCreated("localhost:19092");
streams.start();
});
}

@POST
@Path("/stop")
public void stop() {
streams.close();
}

private void waitForTopicsToBeCreated(String bootstrapServers) {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);

try (AdminClient adminClient = AdminClient.create(config)) {
AtomicBoolean topicsCreated = new AtomicBoolean(false);

while (topicsCreated.get() == false) {
ListTopicsResult topics = adminClient.listTopics();
topics.names().whenComplete((t, e) -> {
if (e != null) {
throw new RuntimeException(e);
} else if (t.contains("streams-test-categories") && t.contains("streams-test-customers")) {
topicsCreated.set(true);
}
});

try {
Thread.sleep(50);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package io.quarkus.it.main;

import io.quarkus.test.junit.SubstrateTest;

@SubstrateTest
public class KafkaStreamsITCase extends KafkaStreamsTest {

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package io.quarkus.it.main;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusTest;
import io.restassured.RestAssured;

@QuarkusTestResource(KafkaTestResource.class)
@QuarkusTest
public class KafkaStreamsTest {

private static Producer<Integer, String> createProducer() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19092");
props.put(ProducerConfig.CLIENT_ID_CONFIG, "streams-test-producer");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

return new KafkaProducer<Integer, String>(props);
}

private static KafkaConsumer<Integer, String> createConsumer() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "streams-test-consumer");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("streams-test-customers-processed"));
return consumer;
}

@AfterEach
public void stopKafkaStreams() {
// explicitly stopping the pipeline *before* the broker is shut down, as it
// otherwise will time out
RestAssured.post("/kafkastreams/stop");
}

@Test
public void testKafkaStreams() throws Exception {
Producer<Integer, String> producer = createProducer();

producer.send(new ProducerRecord<>("streams-test-categories", 1,
"{ \"name\" : \"B2B\", \"value\" : \"business-to-business\" }"));
producer.send(new ProducerRecord<>("streams-test-categories", 2,
"{ \"name\" : \"B2C\", \"value\" : \"business-to-customer\" }"));

producer.send(
new ProducerRecord<>("streams-test-customers", 101, "{ \"id\" : 101, \"name\" : \"Bob\", \"category\" : 1 }"));
producer.send(new ProducerRecord<>("streams-test-customers", 102,
"{ \"id\" : 102, \"name\" : \"Becky\", \"category\" : 2 }"));
producer.send(new ProducerRecord<>("streams-test-customers", 103,
"{ \"id\" : 103, \"name\" : \"Bruce\", \"category\" : 1 }"));

Consumer<Integer, String> consumer = createConsumer();
List<ConsumerRecord<Integer, String>> records = poll(consumer, 3);

ConsumerRecord<Integer, String> record = records.get(0);
Assertions.assertEquals(101, record.key());
Assertions.assertEquals(
"{\"id\":101,\"name\":\"Bob\",\"category\":{\"name\":\"B2B\",\"value\":\"business-to-business\"}}",
record.value());

record = records.get(1);
Assertions.assertEquals(102, record.key());
Assertions.assertEquals(
"{\"id\":102,\"name\":\"Becky\",\"category\":{\"name\":\"B2C\",\"value\":\"business-to-customer\"}}",
record.value());

record = records.get(2);
Assertions.assertEquals(103, record.key());
Assertions.assertEquals(
"{\"id\":103,\"name\":\"Bruce\",\"category\":{\"name\":\"B2B\",\"value\":\"business-to-business\"}}",
record.value());
}

private List<ConsumerRecord<Integer, String>> poll(Consumer<Integer, String> consumer, int expectedRecordCount) {
int fetched = 0;
List<ConsumerRecord<Integer, String>> result = new ArrayList<>();
while (fetched < expectedRecordCount) {
ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofMillis(20000));
records.forEach(result::add);
fetched = result.size();
}

return result;
}

}

0 comments on commit cc59c45

Please sign in to comment.