From 39e0b4cbf2ebbc5748397da006786af8d8ef058f Mon Sep 17 00:00:00 2001 From: Gunnar Morling Date: Sun, 2 Jun 2019 09:54:27 +0200 Subject: [PATCH] #2663 WIP integration test --- integration-tests/kafka/pom.xml | 9 +- .../io/quarkus/it/kafka/JsonNodeSerde.java | 88 ++++++++++++++ .../it/kafka/KafkaStreamsPipeline.java | 104 +++++++++++++++++ .../quarkus/it/main/KafkaStreamsITCase.java | 8 ++ .../io/quarkus/it/main/KafkaStreamsTest.java | 109 ++++++++++++++++++ 5 files changed, 317 insertions(+), 1 deletion(-) create mode 100644 integration-tests/kafka/src/main/java/io/quarkus/it/kafka/JsonNodeSerde.java create mode 100644 integration-tests/kafka/src/main/java/io/quarkus/it/kafka/KafkaStreamsPipeline.java create mode 100644 integration-tests/kafka/src/test/java/io/quarkus/it/main/KafkaStreamsITCase.java create mode 100644 integration-tests/kafka/src/test/java/io/quarkus/it/main/KafkaStreamsTest.java diff --git a/integration-tests/kafka/pom.xml b/integration-tests/kafka/pom.xml index 1fc1489e15b75..d4030d2c41293 100644 --- a/integration-tests/kafka/pom.xml +++ b/integration-tests/kafka/pom.xml @@ -49,13 +49,20 @@ io.quarkus quarkus-resteasy + + com.fasterxml.jackson.core + jackson-databind + io.quarkus quarkus-kafka-client - + + io.quarkus + quarkus-kafka-streams + diff --git a/integration-tests/kafka/src/main/java/io/quarkus/it/kafka/JsonNodeSerde.java b/integration-tests/kafka/src/main/java/io/quarkus/it/kafka/JsonNodeSerde.java new file mode 100644 index 0000000000000..6175d62e6cd7b --- /dev/null +++ b/integration-tests/kafka/src/main/java/io/quarkus/it/kafka/JsonNodeSerde.java @@ -0,0 +1,88 @@ +package io.quarkus.it.kafka; + +import java.io.IOException; +import java.util.Map; + +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serializer; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; + +/** + * A {@link Serde} that (de-)serializes JSON. + */ +public class JsonNodeSerde implements Serde { + + private final ObjectMapper mapper; + + public JsonNodeSerde() { + mapper = new ObjectMapper(); + } + + @Override + public void configure(Map configs, boolean isKey) { + } + + @Override + public void close() { + } + + @Override + public Serializer serializer() { + return new JsonSerializer(); + } + + @Override + public Deserializer deserializer() { + return new JsonDeserializer(); + } + + private final class JsonDeserializer implements Deserializer { + + @Override + public void configure(Map configs, boolean isKey) { + } + + @Override + public JsonNode deserialize(String topic, byte[] data) { + if (data == null) { + return null; + } + + try { + return mapper.readTree(data); + } + catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public void close() { + } + } + + private final class JsonSerializer implements Serializer { + + @Override + public void configure(Map configs, boolean isKey) { + } + + @Override + public byte[] serialize(String topic, JsonNode data) { + try { + return mapper.writeValueAsBytes(data); + } + catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } + + @Override + public void close() { + } + } +} diff --git a/integration-tests/kafka/src/main/java/io/quarkus/it/kafka/KafkaStreamsPipeline.java b/integration-tests/kafka/src/main/java/io/quarkus/it/kafka/KafkaStreamsPipeline.java new file mode 100644 index 0000000000000..566aa9a47604f --- /dev/null +++ b/integration-tests/kafka/src/main/java/io/quarkus/it/kafka/KafkaStreamsPipeline.java @@ -0,0 +1,104 @@ +package io.quarkus.it.kafka; + +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicBoolean; + +import javax.enterprise.context.ApplicationScoped; +import javax.enterprise.event.Observes; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.ListTopicsResult; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.Joined; +import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.Produced; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; + +import io.quarkus.runtime.StartupEvent; + +@ApplicationScoped +public class KafkaStreamsPipeline { + + private KafkaStreams streams; + + private ExecutorService executor; + + void onStart(@Observes StartupEvent ev) { + Properties props = new Properties(); + props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-test-pipeline"); + props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19092"); + props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 * 1024); + props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); + props.put(CommonClientConfigs.METADATA_MAX_AGE_CONFIG, 500); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + StreamsBuilder builder = new StreamsBuilder(); + + JsonNodeSerde jsonNodeSerde = new JsonNodeSerde(); + + KTable categories = builder.table("streams-test-categories", Consumed.with(Serdes.Integer(), jsonNodeSerde)); + + builder.stream("streams-test-customers", Consumed.with(Serdes.Integer(), jsonNodeSerde)) + .selectKey((k, v) -> v.get("category").asInt()) + .join( + categories, + (v1, v2) -> { + ((ObjectNode) v1).replace("category", v2); + return v1; + }, + Joined.with(Serdes.Integer(), jsonNodeSerde, null) + ) + .selectKey((k, v) -> v.get("id").asInt()) + .to("streams-test-customers-processed", Produced.with(Serdes.Integer(), jsonNodeSerde)); + + streams = new KafkaStreams(builder.build(), props); + + executor = Executors.newSingleThreadExecutor(); + executor.execute(() -> { + waitForTopicsToBeCreated("localhost:19092"); + streams.start(); + }); + } + + public void stop() { + streams.close(); + } + private void waitForTopicsToBeCreated(String bootstrapServers) { + Map config = new HashMap<>(); + config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + + try (AdminClient adminClient = AdminClient.create(config)) { + AtomicBoolean topicsCreated = new AtomicBoolean(false); + + while (topicsCreated.get() == false) { + ListTopicsResult topics = adminClient.listTopics(); + topics.names().whenComplete((t, e) -> { + if (e != null) { + throw new RuntimeException(e); + } + else if (t.contains("streams-test-categories") && t.contains("streams-test-customers")) { + topicsCreated.set(true); + } + }); + + try { + Thread.sleep(1000); + } + catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } + } +} diff --git a/integration-tests/kafka/src/test/java/io/quarkus/it/main/KafkaStreamsITCase.java b/integration-tests/kafka/src/test/java/io/quarkus/it/main/KafkaStreamsITCase.java new file mode 100644 index 0000000000000..a2d05684cf7d5 --- /dev/null +++ b/integration-tests/kafka/src/test/java/io/quarkus/it/main/KafkaStreamsITCase.java @@ -0,0 +1,8 @@ +package io.quarkus.it.main; + +import io.quarkus.test.junit.SubstrateTest; + +@SubstrateTest +public class KafkaStreamsITCase extends KafkaStreamsTest { + +} diff --git a/integration-tests/kafka/src/test/java/io/quarkus/it/main/KafkaStreamsTest.java b/integration-tests/kafka/src/test/java/io/quarkus/it/main/KafkaStreamsTest.java new file mode 100644 index 0000000000000..4308bc10b6543 --- /dev/null +++ b/integration-tests/kafka/src/test/java/io/quarkus/it/main/KafkaStreamsTest.java @@ -0,0 +1,109 @@ +package io.quarkus.it.main; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Properties; + +import javax.inject.Inject; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import io.quarkus.it.kafka.KafkaStreamsPipeline; +import io.quarkus.test.common.QuarkusTestResource; +import io.quarkus.test.junit.QuarkusTest; + +@QuarkusTestResource(KafkaTestResource.class) +@QuarkusTest +public class KafkaStreamsTest { + + @Inject + private KafkaStreamsPipeline pipeline; + + private static Producer createProducer() { + Properties props = new Properties(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19092"); + props.put(ProducerConfig.CLIENT_ID_CONFIG, "streams-test-producer"); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName()); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + + return new KafkaProducer(props); + } + + private static KafkaConsumer createConsumer() { + Properties props = new Properties(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19092"); + props.put(ConsumerConfig.GROUP_ID_CONFIG, "streams-test-consumer"); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getName()); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + + KafkaConsumer consumer = new KafkaConsumer<>(props); + consumer.subscribe(Collections.singletonList("streams-test-customers-processed")); + return consumer; + } + + @AfterEach + public void stopKafkaStreams() { + // explicitly stopping the pipeline *before* the broker is shut down, as it + // otherwise will time out + pipeline.stop(); + } + + @Test + public void test() throws Exception { + Producer producer = createProducer(); + + producer.send(new ProducerRecord<>("streams-test-categories", 1, "{ \"name\" : \"B2B\", \"value\" : \"business-to-business\" }")); + producer.send(new ProducerRecord<>("streams-test-categories", 2, "{ \"name\" : \"B2C\", \"value\" : \"business-to-customer\" }")); + + producer.send(new ProducerRecord<>("streams-test-customers", 101, "{ \"id\" : 101, \"name\" : \"Bob\", \"category\" : 1 }")); + producer.send(new ProducerRecord<>("streams-test-customers", 102, "{ \"id\" : 102, \"name\" : \"Becky\", \"category\" : 2 }")); + producer.send(new ProducerRecord<>("streams-test-customers", 103, "{ \"id\" : 103, \"name\" : \"Bruce\", \"category\" : 1 }")); + + Consumer consumer = createConsumer(); + List> records = poll(consumer, 3); + + ConsumerRecord record = records.get(0); + Assertions.assertEquals(101, record.key()); + Assertions.assertEquals("{\"id\":101,\"name\":\"Bob\",\"category\":{\"name\":\"B2B\",\"value\":\"business-to-business\"}}", record.value()); + + record = records.get(1); + Assertions.assertEquals(102, record.key()); + Assertions.assertEquals("{\"id\":102,\"name\":\"Becky\",\"category\":{\"name\":\"B2C\",\"value\":\"business-to-customer\"}}", record.value()); + + record = records.get(2); + Assertions.assertEquals(103, record.key()); + Assertions.assertEquals("{\"id\":103,\"name\":\"Bruce\",\"category\":{\"name\":\"B2B\",\"value\":\"business-to-business\"}}", record.value()); + } + + private List> poll(Consumer consumer, int expectedRecordCount) { + int fetched = 0; + List> result = new ArrayList<>(); + while(fetched < expectedRecordCount) { + ConsumerRecords records = consumer.poll(Duration.ofMillis(20000)); + records.forEach(result::add); + fetched = result.size(); + } + + return result; + } + +}