From a17b774adcc025b526149dd4865ea2ac74dba6e7 Mon Sep 17 00:00:00 2001 From: Dean Wette Date: Wed, 23 Aug 2023 16:07:19 -0500 Subject: [PATCH 01/13] Multi-language docs examples for Kafka Streams --- src/main/docs/guide/kafkaStreams.adoc | 23 +---- test-suite-groovy/build.gradle.kts | 1 + .../kafka/docs/streams/WordCountClient.groovy | 17 ++++ .../docs/streams/WordCountListener.groovy | 35 +++++++ .../kafka/docs/streams/WordCountStream.groovy | 89 +++++++++++++++++ test-suite-kotlin/build.gradle.kts | 1 + .../kafka/docs/streams/WordCountClient.kt | 16 ++++ .../kafka/docs/streams/WordCountListener.kt | 33 +++++++ .../kafka/docs/streams/WordCountStream.kt | 95 +++++++++++++++++++ test-suite/build.gradle.kts | 1 + .../kafka/docs/streams/WordCountClient.java | 17 ++++ .../kafka/docs/streams/WordCountListener.java | 37 ++++++++ .../kafka/docs/streams/WordCountStream.java | 92 ++++++++++++++++++ 13 files changed, 439 insertions(+), 18 deletions(-) create mode 100644 test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/streams/WordCountClient.groovy create mode 100644 test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/streams/WordCountListener.groovy create mode 100644 test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/streams/WordCountStream.groovy create mode 100644 test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/streams/WordCountClient.kt create mode 100644 test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/streams/WordCountListener.kt create mode 100644 test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/streams/WordCountStream.kt create mode 100644 test-suite/src/test/java/io/micronaut/kafka/docs/streams/WordCountClient.java create mode 100644 test-suite/src/test/java/io/micronaut/kafka/docs/streams/WordCountListener.java create mode 100644 test-suite/src/test/java/io/micronaut/kafka/docs/streams/WordCountStream.java diff --git a/src/main/docs/guide/kafkaStreams.adoc b/src/main/docs/guide/kafkaStreams.adoc index 1afb565e2..9e9c2922f 100644 --- a/src/main/docs/guide/kafkaStreams.adoc +++ b/src/main/docs/guide/kafkaStreams.adoc @@ -32,18 +32,9 @@ kafka: You should then define a ann:context.annotation.Factory[] for your streams that defines beans that return a `KStream`. For example to implement the Word Count example from the Kafka Streams documentation: - .Kafka Streams Word Count -[source,java] ----- -include::{testskafkastreams}/wordcount/WordCountStream.java[tags=imports, indent=0] - -include::{testskafkastreams}/wordcount/WordCountStream.java[tags=clazz, indent=0] -include::{testskafkastreams}/wordcount/WordCountStream.java[tags=wordCountStream, indent=4] - -} ----- +snippet::io.micronaut.kafka.docs.streams.WordCountStream[tags="imports, clazz, wordCountStream", indent=0] <1> The input topic <2> The output topic @@ -55,18 +46,14 @@ NOTE: With Kafka streams the key and value `Serdes` (serializer/deserializer) mu You can use the ann:configuration.kafka.annotation.KafkaClient[] annotation to send a sentence to be processed by the above stream: .Defining a Kafka Client -[source,java] ----- -include::{testskafkastreams}/wordcount/WordCountClient.java[] ----- + +snippet::io.micronaut.kafka.docs.streams.WordCountClient[tags="imports, clazz", indent=0] You can also define a ann:configuration.kafka.annotation.KafkaListener[] to listen for the result of the word count stream: .Defining a Kafka Listener -[source,java] ----- -include::{testskafkastreams}/wordcount/WordCountListener.java[] ----- + +snippet::io.micronaut.kafka.docs.streams.WordCountListener[tags="imports, clazz", indent=0] == Configuring Kafka Streams diff --git a/test-suite-groovy/build.gradle.kts b/test-suite-groovy/build.gradle.kts index 95eb88570..4e6faf77e 100644 --- a/test-suite-groovy/build.gradle.kts +++ b/test-suite-groovy/build.gradle.kts @@ -15,4 +15,5 @@ dependencies { testImplementation(mnSerde.micronaut.serde.jackson) testImplementation(mnRxjava2.micronaut.rxjava2) testImplementation(projects.micronautKafka) + testImplementation(projects.micronautKafkaStreams) } diff --git a/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/streams/WordCountClient.groovy b/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/streams/WordCountClient.groovy new file mode 100644 index 000000000..0de2f9c35 --- /dev/null +++ b/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/streams/WordCountClient.groovy @@ -0,0 +1,17 @@ +package io.micronaut.kafka.docs.streams + +// tag::imports[] +import io.micronaut.configuration.kafka.annotation.KafkaClient +import io.micronaut.configuration.kafka.annotation.Topic +import io.micronaut.context.annotation.Requires +// end::imports[] + +@Requires(property = 'spec.name', value = 'WordCountStreamTest') +// tag::clazz[] +@KafkaClient +interface WordCountClient { + + @Topic(WordCountStream.INPUT) + void publishSentence(String sentence) +} +// end::clazz[] diff --git a/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/streams/WordCountListener.groovy b/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/streams/WordCountListener.groovy new file mode 100644 index 000000000..83b49c972 --- /dev/null +++ b/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/streams/WordCountListener.groovy @@ -0,0 +1,35 @@ +package io.micronaut.kafka.docs.streams + +// tag::imports[] +import io.micronaut.configuration.kafka.annotation.KafkaKey; +import io.micronaut.configuration.kafka.annotation.KafkaListener +import io.micronaut.configuration.kafka.annotation.Topic +import io.micronaut.context.annotation.Requires + +import java.util.concurrent.ConcurrentHashMap + +import static io.micronaut.configuration.kafka.annotation.OffsetReset.EARLIEST +// end::imports[] + +@Requires(property = 'spec.name', value = 'WordCountStreamTest') +// tag::clazz[] +@KafkaListener(offsetReset = EARLIEST, groupId = 'WordCountListener') +class WordCountListener { + + private final Map wordCounts = new ConcurrentHashMap<>() + + @Topic(WordCountStream.OUTPUT) + void count(@KafkaKey String word, long count) { + wordCounts.put(word, count) + } + + long getCount(String word) { + Long num = wordCounts.get(word) + num ?: 0 + } + + Map getWordCounts() { + Collections.unmodifiableMap(wordCounts) + } +} +// end::clazz[] diff --git a/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/streams/WordCountStream.groovy b/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/streams/WordCountStream.groovy new file mode 100644 index 000000000..9e67b982d --- /dev/null +++ b/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/streams/WordCountStream.groovy @@ -0,0 +1,89 @@ +package io.micronaut.kafka.docs.streams + + +// tag::imports[] +import io.micronaut.configuration.kafka.streams.ConfiguredStreamBuilder +import io.micronaut.context.annotation.Factory +import io.micronaut.context.annotation.Requires +import jakarta.inject.Named +import jakarta.inject.Singleton +import org.apache.kafka.clients.consumer.ConsumerConfig +import org.apache.kafka.common.serialization.Serdes +import org.apache.kafka.streams.StreamsConfig +import org.apache.kafka.streams.kstream.Grouped +import org.apache.kafka.streams.kstream.KStream +import org.apache.kafka.streams.kstream.KTable +import org.apache.kafka.streams.kstream.Materialized +import org.apache.kafka.streams.kstream.Produced + +// end::imports[] + +@Requires(property = 'spec.name', value = 'WordCountStreamTest') +// tag::clazz[] +@Factory +class WordCountStream { + + public static final String STREAM_WORD_COUNT = 'word-count' + public static final String INPUT = 'streams-plaintext-input' // <1> + public static final String OUTPUT = 'streams-wordcount-output' // <2> + public static final String WORD_COUNT_STORE = 'word-count-store' +// end::clazz[] + + // tag::wordCountStream[] + @Singleton + @Named(STREAM_WORD_COUNT) + KStream wordCountStream(ConfiguredStreamBuilder builder) { // <3> + // set default serdes + Properties props = builder.getConfiguration(); + props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()) + props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()) + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 'earliest') + props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, '500') + + KStream source = builder.stream(INPUT) + + KTable groupedByWord = source + .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+"))) + .groupBy((key, word) -> word, Grouped.with(Serdes.String(), Serdes.String())) + //Store the result in a store for lookup later + .count(Materialized.as(WORD_COUNT_STORE)) // <4> + + groupedByWord + //convert to stream + .toStream() + //send to output using specific serdes + .to(OUTPUT, Produced.with(Serdes.String(), Serdes.Long())) + + return source + } + // end::wordCountStream[] + + // tag::namedStream[] + public static final String MY_STREAM = 'my-stream' + public static final String NAMED_WORD_COUNT_INPUT = 'named-word-count-input' + public static final String NAMED_WORD_COUNT_OUTPUT = 'named-word-count-output' + + @Singleton + @Named(MY_STREAM) + KStream myStream( + @Named(MY_STREAM) ConfiguredStreamBuilder builder) { + + // end::namedStream[] + // set default serdes + Properties props = builder.getConfiguration() + props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()) + props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()) + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 'earliest') + props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, '500') + + KStream source = builder.stream(NAMED_WORD_COUNT_INPUT) + KTable counts = source + .flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" "))) + .groupBy((key, value) -> value) + .count() + + // need to override value serde to Long type + counts.toStream().to(NAMED_WORD_COUNT_OUTPUT, Produced.with(Serdes.String(), Serdes.Long())); + return source + } +} diff --git a/test-suite-kotlin/build.gradle.kts b/test-suite-kotlin/build.gradle.kts index cf993cd09..2879de186 100644 --- a/test-suite-kotlin/build.gradle.kts +++ b/test-suite-kotlin/build.gradle.kts @@ -17,4 +17,5 @@ dependencies { testImplementation(mnSerde.micronaut.serde.jackson) testImplementation(mnRxjava2.micronaut.rxjava2) testImplementation(projects.micronautKafka) + testImplementation(projects.micronautKafkaStreams) } diff --git a/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/streams/WordCountClient.kt b/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/streams/WordCountClient.kt new file mode 100644 index 000000000..b1f5e5c4b --- /dev/null +++ b/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/streams/WordCountClient.kt @@ -0,0 +1,16 @@ +package io.micronaut.kafka.docs.streams + +// tag::imports[] +import io.micronaut.configuration.kafka.annotation.KafkaClient +import io.micronaut.configuration.kafka.annotation.Topic +import io.micronaut.context.annotation.Requires +// end::imports[] + +@Requires(property = "spec.name", value = "WordCountStreamTest") +// tag::clazz[] +@KafkaClient +interface WordCountClient { + @Topic(WordCountStream.INPUT) + fun publishSentence(sentence: String?) +} +// tag::clazz[] diff --git a/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/streams/WordCountListener.kt b/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/streams/WordCountListener.kt new file mode 100644 index 000000000..86e2ad795 --- /dev/null +++ b/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/streams/WordCountListener.kt @@ -0,0 +1,33 @@ +package io.micronaut.kafka.docs.streams + +// tag::imports[] +import io.micronaut.configuration.kafka.annotation.KafkaKey +import io.micronaut.configuration.kafka.annotation.KafkaListener +import io.micronaut.configuration.kafka.annotation.OffsetReset +import io.micronaut.configuration.kafka.annotation.Topic +import io.micronaut.context.annotation.Requires +import java.util.* +import java.util.concurrent.ConcurrentHashMap +// end::imports[] + +@Requires(property = "spec.name", value = "WordCountStreamTest") +// tag::clazz[] +@KafkaListener(offsetReset = OffsetReset.EARLIEST, groupId = "WordCountListener") +class WordCountListener { + + private val wordCounts: MutableMap = ConcurrentHashMap() + @Topic(WordCountStream.OUTPUT) + fun count(@KafkaKey word: String, count: Long) { + wordCounts[word] = count + } + + fun getCount(word: String): Long { + val num = wordCounts[word] + return num ?: 0 + } + + fun getWordCounts(): Map { + return Collections.unmodifiableMap(wordCounts) + } +} +// end::clazz[] diff --git a/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/streams/WordCountStream.kt b/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/streams/WordCountStream.kt new file mode 100644 index 000000000..4c56d699c --- /dev/null +++ b/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/streams/WordCountStream.kt @@ -0,0 +1,95 @@ +package io.micronaut.kafka.docs.streams + +// tag::imports[] +import io.micronaut.configuration.kafka.streams.ConfiguredStreamBuilder +import io.micronaut.context.annotation.Factory +import io.micronaut.context.annotation.Requires +import jakarta.inject.Named +import jakarta.inject.Singleton +import org.apache.kafka.clients.consumer.ConsumerConfig +import org.apache.kafka.common.serialization.Serdes +import org.apache.kafka.streams.StreamsConfig +import org.apache.kafka.streams.kstream.Grouped +import org.apache.kafka.streams.kstream.KStream +import org.apache.kafka.streams.kstream.Materialized +import org.apache.kafka.streams.kstream.Produced +import java.util.* +// end::imports[] + +@Requires(property = "spec.name", value = "WordCountStreamTest") +// tag::clazz[] +@Factory +class WordCountStream { + companion object { + const val STREAM_WORD_COUNT = "word-count" + const val INPUT = "streams-plaintext-input" // <1> + const val OUTPUT = "streams-wordcount-output" // <2> + const val WORD_COUNT_STORE = "word-count-store" + + const val MY_STREAM = "my-stream" + const val NAMED_WORD_COUNT_INPUT = "named-word-count-input" + const val NAMED_WORD_COUNT_OUTPUT = "named-word-count-output" + } + // end::clazz[] + + // tag::wordCountStream[] + @Singleton + @Named(STREAM_WORD_COUNT) + fun wordCountStream(builder: ConfiguredStreamBuilder): KStream { // <3> + // set default serdes + val props = builder.configuration + props[StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG] = Serdes.String().javaClass.getName() + props[StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG] = Serdes.String().javaClass.getName() + props[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "earliest" + props[StreamsConfig.COMMIT_INTERVAL_MS_CONFIG] = "500" + val source = builder.stream(INPUT) + val groupedByWord = source + .flatMapValues { value: String -> + Arrays.asList( + *value.lowercase(Locale.getDefault()).split("\\W+".toRegex()).dropLastWhile { it.isEmpty() } + .toTypedArray()) + } + .groupBy( + { key: String?, word: String? -> word }, + Grouped.with(Serdes.String(), Serdes.String()) + ) + //Store the result in a store for lookup later + .count(Materialized.`as`(WORD_COUNT_STORE)) // <4> + groupedByWord + //convert to stream + .toStream() + //send to output using specific serdes + .to(OUTPUT, Produced.with(Serdes.String(), Serdes.Long())) + return source + } + // end::wordCountStream[] + + // tag::namedStream[] + @Singleton + @Named(MY_STREAM) + fun myStream( + @Named(MY_STREAM) builder: ConfiguredStreamBuilder + ): KStream { + + // end::namedStream[] + // set default serdes + val props = builder.configuration + props[StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG] = Serdes.String().javaClass.getName() + props[StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG] = Serdes.String().javaClass.getName() + props[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "earliest" + props[StreamsConfig.COMMIT_INTERVAL_MS_CONFIG] = "500" + val source = builder.stream(NAMED_WORD_COUNT_INPUT) + val counts = source + .flatMapValues { value: String -> + Arrays.asList( + *value.lowercase(Locale.getDefault()).split(" ".toRegex()).dropLastWhile { it.isEmpty() } + .toTypedArray()) + } + .groupBy { key: String?, value: String? -> value } + .count() + + // need to override value serde to Long type + counts.toStream().to(NAMED_WORD_COUNT_OUTPUT, Produced.with(Serdes.String(), Serdes.Long())) + return source + } +} diff --git a/test-suite/build.gradle.kts b/test-suite/build.gradle.kts index d466d8bcb..3de0837d1 100644 --- a/test-suite/build.gradle.kts +++ b/test-suite/build.gradle.kts @@ -16,4 +16,5 @@ dependencies { testImplementation(mnSerde.micronaut.serde.jackson) testImplementation(mnRxjava2.micronaut.rxjava2) testImplementation(projects.micronautKafka) + testImplementation(projects.micronautKafkaStreams) } diff --git a/test-suite/src/test/java/io/micronaut/kafka/docs/streams/WordCountClient.java b/test-suite/src/test/java/io/micronaut/kafka/docs/streams/WordCountClient.java new file mode 100644 index 000000000..74bca6eca --- /dev/null +++ b/test-suite/src/test/java/io/micronaut/kafka/docs/streams/WordCountClient.java @@ -0,0 +1,17 @@ +package io.micronaut.kafka.docs.streams; + +// tag::imports[] +import io.micronaut.configuration.kafka.annotation.KafkaClient; +import io.micronaut.configuration.kafka.annotation.Topic; +import io.micronaut.context.annotation.Requires; +// end::imports[] + +@Requires(property = "spec.name", value = "WordCountStreamTest") +// tag::clazz[] +@KafkaClient +public interface WordCountClient { + + @Topic(WordCountStream.INPUT) + void publishSentence(String sentence); +} +// end::clazz[] diff --git a/test-suite/src/test/java/io/micronaut/kafka/docs/streams/WordCountListener.java b/test-suite/src/test/java/io/micronaut/kafka/docs/streams/WordCountListener.java new file mode 100644 index 000000000..90bcc0799 --- /dev/null +++ b/test-suite/src/test/java/io/micronaut/kafka/docs/streams/WordCountListener.java @@ -0,0 +1,37 @@ +package io.micronaut.kafka.docs.streams; + +// tag::imports[] +import io.micronaut.configuration.kafka.annotation.KafkaKey; +import io.micronaut.configuration.kafka.annotation.KafkaListener; +import io.micronaut.configuration.kafka.annotation.Topic; +import io.micronaut.context.annotation.Requires; + +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import static io.micronaut.configuration.kafka.annotation.OffsetReset.EARLIEST; +// end::imports[] + +@Requires(property = "spec.name", value = "WordCountStreamTest") +// tag::clazz[] +@KafkaListener(offsetReset = EARLIEST, groupId = "WordCountListener") +public class WordCountListener { + + private final Map wordCounts = new ConcurrentHashMap<>(); + + @Topic(WordCountStream.OUTPUT) + void count(@KafkaKey String word, long count) { + wordCounts.put(word, count); + } + + public long getCount(String word) { + Long num = wordCounts.get(word); + return num != null ? num : 0; + } + + public Map getWordCounts() { + return Collections.unmodifiableMap(wordCounts); + } +} +// end::clazz[] diff --git a/test-suite/src/test/java/io/micronaut/kafka/docs/streams/WordCountStream.java b/test-suite/src/test/java/io/micronaut/kafka/docs/streams/WordCountStream.java new file mode 100644 index 000000000..2a8d175fe --- /dev/null +++ b/test-suite/src/test/java/io/micronaut/kafka/docs/streams/WordCountStream.java @@ -0,0 +1,92 @@ +package io.micronaut.kafka.docs.streams; + +// tag::imports[] +import io.micronaut.configuration.kafka.streams.ConfiguredStreamBuilder; +import io.micronaut.context.annotation.Factory; +import io.micronaut.context.annotation.Requires; +import jakarta.inject.Named; +import jakarta.inject.Singleton; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.kstream.Grouped; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.Produced; + +import java.util.Arrays; +import java.util.Locale; +import java.util.Properties; +// end::imports[] + +@Requires(property = "spec.name", value = "WordCountStreamTest") +// tag::clazz[] +@Factory +public class WordCountStream { + + public static final String STREAM_WORD_COUNT = "word-count"; + public static final String INPUT = "streams-plaintext-input"; // <1> + public static final String OUTPUT = "streams-wordcount-output"; // <2> + public static final String WORD_COUNT_STORE = "word-count-store"; + +// end::clazz[] + + // tag::wordCountStream[] + @Singleton + @Named(STREAM_WORD_COUNT) + KStream wordCountStream(ConfiguredStreamBuilder builder) { // <3> + // set default serdes + Properties props = builder.getConfiguration(); + props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); + props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "500"); + + KStream source = builder.stream(INPUT); + + KTable groupedByWord = source + .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+"))) + .groupBy((key, word) -> word, Grouped.with(Serdes.String(), Serdes.String())) + //Store the result in a store for lookup later + .count(Materialized.as(WORD_COUNT_STORE)); // <4> + + groupedByWord + //convert to stream + .toStream() + //send to output using specific serdes + .to(OUTPUT, Produced.with(Serdes.String(), Serdes.Long())); + + return source; + } + // end::wordCountStream[] + + // tag::namedStream[] + public static final String MY_STREAM = "my-stream"; + public static final String NAMED_WORD_COUNT_INPUT = "named-word-count-input"; + public static final String NAMED_WORD_COUNT_OUTPUT = "named-word-count-output"; + + @Singleton + @Named(MY_STREAM) + KStream myStream( + @Named(MY_STREAM) ConfiguredStreamBuilder builder) { + + // end::namedStream[] + // set default serdes + Properties props = builder.getConfiguration(); + props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); + props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "500"); + + KStream source = builder.stream(NAMED_WORD_COUNT_INPUT); + KTable counts = source + .flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" "))) + .groupBy((key, value) -> value) + .count(); + + // need to override value serde to Long type + counts.toStream().to(NAMED_WORD_COUNT_OUTPUT, Produced.with(Serdes.String(), Serdes.Long())); + return source; + } +} From 119b45cf057d6bcae3581004cf3cccc4776ab478 Mon Sep 17 00:00:00 2001 From: Dean Wette Date: Wed, 23 Aug 2023 16:18:22 -0500 Subject: [PATCH 02/13] Multi-language docs examples for Streams InteractiveQueryService --- .../kafkaStreamInteractiveQuery.adoc | 5 +- .../InteractiveQueryServiceExample.groovy | 62 ++++++++++++++++++ .../streams/InteractiveQueryServiceExample.kt | 60 ++++++++++++++++++ .../InteractiveQueryServiceExample.java | 63 +++++++++++++++++++ 4 files changed, 186 insertions(+), 4 deletions(-) create mode 100644 test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/streams/InteractiveQueryServiceExample.groovy create mode 100644 test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/streams/InteractiveQueryServiceExample.kt create mode 100644 test-suite/src/test/java/io/micronaut/kafka/docs/streams/InteractiveQueryServiceExample.java diff --git a/src/main/docs/guide/kafkaStreams/kafkaStreamInteractiveQuery.adoc b/src/main/docs/guide/kafkaStreams/kafkaStreamInteractiveQuery.adoc index a8daedbc7..db3fb75ca 100644 --- a/src/main/docs/guide/kafkaStreams/kafkaStreamInteractiveQuery.adoc +++ b/src/main/docs/guide/kafkaStreams/kafkaStreamInteractiveQuery.adoc @@ -9,7 +9,4 @@ include::{kafkastreams}/InteractiveQueryService.java[tags=getQueryableStore, ind An example service that wraps the `InteractiveQueryService` is included below. This is here to illustrate that when calling the `getQueryableStore` method you must provide the store name and preferably the type of key and value you are trying to retrieve. -[source,java] ----- -include::{testskafkastreams}/wordcount/InteractiveQueryServiceExample.java[] ----- \ No newline at end of file +snippet::io.micronaut.kafka.docs.streams.InteractiveQueryServiceExample[tags="imports, clazz", indent=0] diff --git a/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/streams/InteractiveQueryServiceExample.groovy b/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/streams/InteractiveQueryServiceExample.groovy new file mode 100644 index 000000000..4a77fcb59 --- /dev/null +++ b/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/streams/InteractiveQueryServiceExample.groovy @@ -0,0 +1,62 @@ +package io.micronaut.kafka.docs.streams; + +// tag::imports[] +import io.micronaut.configuration.kafka.streams.InteractiveQueryService +import io.micronaut.context.annotation.Requires; +import jakarta.inject.Singleton; +import org.apache.kafka.streams.state.QueryableStoreTypes; +import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; +// end::imports[] + + +@Requires(property = 'spec.name', value = 'WordCountStreamTest') +// tag::clazz[] +/** + * Example service that uses the InteractiveQueryService in a reusable way. This is only intended as an example. + */ +@Singleton +class InteractiveQueryServiceExample { + + private final InteractiveQueryService interactiveQueryService; + + InteractiveQueryServiceExample(InteractiveQueryService interactiveQueryService) { + this.interactiveQueryService = interactiveQueryService; + } + + /** + * Method to get the word state store and word count from the store using the interactive query service. + * + * @param stateStore the name of the state store ie "foo-store" + * @param word the key to get, in this case the word as the stream and ktable have been grouped by word + * @return the Long count of the word in the store + */ + Long getWordCount(String stateStore, String word) { + Optional> queryableStore = interactiveQueryService.getQueryableStore(stateStore, QueryableStoreTypes.keyValueStore()); + return queryableStore.map(kvReadOnlyKeyValueStore -> kvReadOnlyKeyValueStore.get(word)).orElse(0L); + } + + /** + * Method to get byte array from a state store using the interactive query service. + * + * @param stateStore the name of the state store ie "bar-store" + * @param blobName the key to get, in this case the name of the blob + * @return the byte[] stored in the state store + */ + byte[] getBytes(String stateStore, String blobName) { + Optional> queryableStore = interactiveQueryService.getQueryableStore(stateStore, QueryableStoreTypes.keyValueStore()); + return queryableStore.map(stringReadOnlyKeyValueStore -> stringReadOnlyKeyValueStore.get(blobName)).orElse(null); + } + + /** + * Method to get value V by key K. + * + * @param stateStore the name of the state store ie "baz-store" + * @param name the key to get + * @return the value of type V stored in the state store + */ + V getGenericKeyValue(String stateStore, K name) { + Optional> queryableStore = interactiveQueryService.getQueryableStore(stateStore, QueryableStoreTypes.keyValueStore()); + return queryableStore.map(kvReadOnlyKeyValueStore -> kvReadOnlyKeyValueStore.get(name)).orElse(null); + } +} +// end::clazz[] diff --git a/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/streams/InteractiveQueryServiceExample.kt b/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/streams/InteractiveQueryServiceExample.kt new file mode 100644 index 000000000..23b30f15a --- /dev/null +++ b/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/streams/InteractiveQueryServiceExample.kt @@ -0,0 +1,60 @@ +package io.micronaut.kafka.docs.streams + +// tag::imports[] +import io.micronaut.configuration.kafka.streams.InteractiveQueryService +import io.micronaut.context.annotation.Requires +import jakarta.inject.Singleton +import org.apache.kafka.streams.state.QueryableStoreTypes +import org.apache.kafka.streams.state.ReadOnlyKeyValueStore +// end::imports[] + +@Requires(property = "spec.name", value = "WordCountStreamTest") +// tag::clazz[] +/** + * Example service that uses the InteractiveQueryService in a reusable way. This is only intended as an example. + */ +@Singleton +class InteractiveQueryServiceExample(private val interactiveQueryService: InteractiveQueryService) { + /** + * Method to get the word state store and word count from the store using the interactive query service. + * + * @param stateStore the name of the state store ie "foo-store" + * @param word the key to get, in this case the word as the stream and ktable have been grouped by word + * @return the Long count of the word in the store + */ + fun getWordCount(stateStore: String?, word: String): Long { + val queryableStore = interactiveQueryService.getQueryableStore( + stateStore, QueryableStoreTypes.keyValueStore()) + return queryableStore.map { kvReadOnlyKeyValueStore: ReadOnlyKeyValueStore -> + kvReadOnlyKeyValueStore[word] }.orElse(0L) + } + + /** + * Method to get byte array from a state store using the interactive query service. + * + * @param stateStore the name of the state store ie "bar-store" + * @param blobName the key to get, in this case the name of the blob + * @return the byte[] stored in the state store + */ + fun getBytes(stateStore: String?, blobName: String): ByteArray? { + val queryableStore = interactiveQueryService.getQueryableStore( + stateStore, QueryableStoreTypes.keyValueStore()) + return queryableStore.map { stringReadOnlyKeyValueStore: ReadOnlyKeyValueStore -> + stringReadOnlyKeyValueStore[blobName] }.orElse(null) + } + + /** + * Method to get value V by key K. + * + * @param stateStore the name of the state store ie "baz-store" + * @param name the key to get + * @return the value of type V stored in the state store + */ + fun getGenericKeyValue(stateStore: String?, name: K): V? { + val queryableStore = interactiveQueryService.getQueryableStore( + stateStore, QueryableStoreTypes.keyValueStore()) + return queryableStore.map { kvReadOnlyKeyValueStore: ReadOnlyKeyValueStore -> + kvReadOnlyKeyValueStore[name] }.orElse(null) + } +} +// end::clazz[] diff --git a/test-suite/src/test/java/io/micronaut/kafka/docs/streams/InteractiveQueryServiceExample.java b/test-suite/src/test/java/io/micronaut/kafka/docs/streams/InteractiveQueryServiceExample.java new file mode 100644 index 000000000..cfe649299 --- /dev/null +++ b/test-suite/src/test/java/io/micronaut/kafka/docs/streams/InteractiveQueryServiceExample.java @@ -0,0 +1,63 @@ +package io.micronaut.kafka.docs.streams; + +// tag::imports[] +import io.micronaut.configuration.kafka.streams.InteractiveQueryService; +import io.micronaut.context.annotation.Requires; +import jakarta.inject.Singleton; +import org.apache.kafka.streams.state.QueryableStoreTypes; +import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; + +import java.util.Optional; +// end::imports[] + +@Requires(property = "spec.name", value = "WordCountStreamTest") +// tag::clazz[] +/** + * Example service that uses the InteractiveQueryService in a reusable way. This is only intended as an example. + */ +@Singleton +public class InteractiveQueryServiceExample { + + private final InteractiveQueryService interactiveQueryService; + + public InteractiveQueryServiceExample(InteractiveQueryService interactiveQueryService) { + this.interactiveQueryService = interactiveQueryService; + } + + /** + * Method to get the word state store and word count from the store using the interactive query service. + * + * @param stateStore the name of the state store ie "foo-store" + * @param word the key to get, in this case the word as the stream and ktable have been grouped by word + * @return the Long count of the word in the store + */ + public Long getWordCount(String stateStore, String word) { + Optional> queryableStore = interactiveQueryService.getQueryableStore(stateStore, QueryableStoreTypes.keyValueStore()); + return queryableStore.map(kvReadOnlyKeyValueStore -> kvReadOnlyKeyValueStore.get(word)).orElse(0L); + } + + /** + * Method to get byte array from a state store using the interactive query service. + * + * @param stateStore the name of the state store ie "bar-store" + * @param blobName the key to get, in this case the name of the blob + * @return the byte[] stored in the state store + */ + public byte[] getBytes(String stateStore, String blobName) { + Optional> queryableStore = interactiveQueryService.getQueryableStore(stateStore, QueryableStoreTypes.keyValueStore()); + return queryableStore.map(stringReadOnlyKeyValueStore -> stringReadOnlyKeyValueStore.get(blobName)).orElse(null); + } + + /** + * Method to get value V by key K. + * + * @param stateStore the name of the state store ie "baz-store" + * @param name the key to get + * @return the value of type V stored in the state store + */ + public V getGenericKeyValue(String stateStore, K name) { + Optional> queryableStore = interactiveQueryService.getQueryableStore(stateStore, QueryableStoreTypes.keyValueStore()); + return queryableStore.map(kvReadOnlyKeyValueStore -> kvReadOnlyKeyValueStore.get(name)).orElse(null); + } +} +// end::clazz[] From fac9d6d48a15328912b466e13afa6484deed4aab Mon Sep 17 00:00:00 2001 From: Guillermo Calvo Date: Thu, 24 Aug 2023 18:35:20 +0200 Subject: [PATCH 03/13] Add dummy kafka stream factories so that the app context can be started --- .../kafka/docs/streams/NoOpStreamFactory.groovy | 17 +++++++++++++++++ .../kafka/docs/streams/NoOpStreamFactory.kt | 16 ++++++++++++++++ .../kafka/docs/streams/NoOpStreamFactory.java | 17 +++++++++++++++++ 3 files changed, 50 insertions(+) create mode 100644 test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/streams/NoOpStreamFactory.groovy create mode 100644 test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/streams/NoOpStreamFactory.kt create mode 100644 test-suite/src/test/java/io/micronaut/kafka/docs/streams/NoOpStreamFactory.java diff --git a/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/streams/NoOpStreamFactory.groovy b/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/streams/NoOpStreamFactory.groovy new file mode 100644 index 000000000..c106dab72 --- /dev/null +++ b/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/streams/NoOpStreamFactory.groovy @@ -0,0 +1,17 @@ +package io.micronaut.kafka.docs.streams + +import io.micronaut.configuration.kafka.streams.ConfiguredStreamBuilder +import io.micronaut.context.annotation.Factory +import jakarta.inject.Named +import jakarta.inject.Singleton +import org.apache.kafka.streams.kstream.KStream + +@Factory +class NoOpStreamFactory { + + @Singleton + @Named("no-op-stream") + KStream noOpStream(ConfiguredStreamBuilder builder) { + return builder.stream("no-op-input") + } +} diff --git a/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/streams/NoOpStreamFactory.kt b/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/streams/NoOpStreamFactory.kt new file mode 100644 index 000000000..3f228e9d8 --- /dev/null +++ b/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/streams/NoOpStreamFactory.kt @@ -0,0 +1,16 @@ +package io.micronaut.kafka.docs.streams + +import io.micronaut.configuration.kafka.streams.ConfiguredStreamBuilder +import io.micronaut.context.annotation.Factory +import jakarta.inject.Named +import jakarta.inject.Singleton +import org.apache.kafka.streams.kstream.KStream + +@Factory +class NoOpStreamFactory { + @Singleton + @Named("no-op-stream") + fun noOpStream(builder: ConfiguredStreamBuilder): KStream { + return builder.stream("no-op-input") + } +} diff --git a/test-suite/src/test/java/io/micronaut/kafka/docs/streams/NoOpStreamFactory.java b/test-suite/src/test/java/io/micronaut/kafka/docs/streams/NoOpStreamFactory.java new file mode 100644 index 000000000..e5f238aee --- /dev/null +++ b/test-suite/src/test/java/io/micronaut/kafka/docs/streams/NoOpStreamFactory.java @@ -0,0 +1,17 @@ +package io.micronaut.kafka.docs.streams; + +import io.micronaut.configuration.kafka.streams.ConfiguredStreamBuilder; +import io.micronaut.context.annotation.Factory; +import jakarta.inject.Named; +import jakarta.inject.Singleton; +import org.apache.kafka.streams.kstream.KStream; + +@Factory +public class NoOpStreamFactory { + + @Singleton + @Named("no-op-stream") + KStream noOpStream(ConfiguredStreamBuilder builder) { + return builder.stream("no-op-input"); + } +} From da1b7d73932d374279abd07656dae9f8d223ed78 Mon Sep 17 00:00:00 2001 From: Dean Wette Date: Thu, 24 Aug 2023 15:35:28 -0500 Subject: [PATCH 04/13] Some cleanup and refactoring to make the docs snippets display in a sensible way. --- src/main/docs/guide/kafkaStreams.adoc | 23 +++----- .../kafkaStreamInteractiveQuery.adoc | 6 +- .../InteractiveQueryServiceExample.groovy | 18 ++++-- .../kafka/docs/streams/WordCountClient.groovy | 2 +- .../docs/streams/WordCountListener.groovy | 2 +- .../kafka/docs/streams/WordCountStream.groovy | 56 +++++++++---------- .../streams/InteractiveQueryServiceExample.kt | 6 +- .../kafka/docs/streams/WordCountClient.kt | 3 +- .../kafka/docs/streams/WordCountListener.kt | 3 +- .../kafka/docs/streams/WordCountStream.kt | 40 ++++++------- .../InteractiveQueryServiceExample.java | 18 ++++-- .../kafka/docs/streams/WordCountClient.java | 2 +- .../kafka/docs/streams/WordCountListener.java | 2 +- .../kafka/docs/streams/WordCountStream.java | 55 +++++++++--------- 14 files changed, 114 insertions(+), 122 deletions(-) diff --git a/src/main/docs/guide/kafkaStreams.adoc b/src/main/docs/guide/kafkaStreams.adoc index 9e9c2922f..163737c20 100644 --- a/src/main/docs/guide/kafkaStreams.adoc +++ b/src/main/docs/guide/kafkaStreams.adoc @@ -34,12 +34,12 @@ You should then define a ann:context.annotation.Factory[] for your streams that .Kafka Streams Word Count -snippet::io.micronaut.kafka.docs.streams.WordCountStream[tags="imports, clazz, wordCountStream", indent=0] +snippet::io.micronaut.kafka.docs.streams.WordCountStream[tags="imports, clazz, wordCountStream"] -<1> The input topic -<2> The output topic -<3> An instance of api:configuration.kafka.streams.ConfiguredStreamBuilder[] is injected that allows mutating the configuration -<4> Materialize the count stream and save to a state store +<1> An instance of api:configuration.kafka.streams.ConfiguredStreamBuilder[] is injected that allows mutating the configuration +<2> The input topic +<3> Materialize the count stream and save to a state store +<4> The output topic NOTE: With Kafka streams the key and value `Serdes` (serializer/deserializer) must be classes with a zero argument constructor. If you wish to use JSON (de)serialization you can subclass api:configuration.kafka.serde.JsonObjectSerde[] to define your `Serdes` @@ -96,20 +96,13 @@ The above configuration sets the `num.stream.threads` setting of the Kafka `Stre You can then inject an `api:configuration.kafka.streams.ConfiguredStreamBuilder[]` specifically for the above configuration using `jakarta.inject.Named`: -[source,java] ----- -@Named("my-other-stream") -KStream myOtherKStream(@Named("my-other-stream") ConfiguredStreamBuilder builder) {...} ----- +snippet::io.micronaut.kafka.docs.streams.WordCountStream[tags="clazz, myOtherStream"] NOTE: If you do not provide a `@Named` on the `ConfiguredStreamBuilder` you have multiple KStreams defined that share the default configurations like client id, application id, etc.It is advisable when using multiple streams in a single app to provide a `@Named` instance of `ConfiguredStreamBuilder` for each stream. .Kafka Streams Word Count -[source,java] ----- -include::{testskafkastreams}/wordcount/WordCountStream.java[tags=namedStream,indent=0] -} ----- + +snippet::io.micronaut.kafka.docs.streams.WordCountStream[tags="clazz, namedStream"] .Configuring Kafka Streams for testing When writing a test without starting the actual Kafka server, you can instruct Micronaut not to start Kafka Streams. To do this create a config file suffixed with an environment name, such as `application-test.yml` and set the `kafka.streams.[STREAM-NAME].start-kafka-streams` to `false`. diff --git a/src/main/docs/guide/kafkaStreams/kafkaStreamInteractiveQuery.adoc b/src/main/docs/guide/kafkaStreams/kafkaStreamInteractiveQuery.adoc index db3fb75ca..fffb5c451 100644 --- a/src/main/docs/guide/kafkaStreams/kafkaStreamInteractiveQuery.adoc +++ b/src/main/docs/guide/kafkaStreams/kafkaStreamInteractiveQuery.adoc @@ -1,11 +1,7 @@ When using streams you can set a state store for your stream using a store builder and telling the stream to store its data. In the above example for the Kafka Streams Word Count, the output is materialized to a named store that can later be retrieved via the Interactive Query Service. Apache Kafka docs https://kafka.apache.org/10/documentation/streams/developer-guide/interactive-queries.html#querying-local-key-value-stores[available here]. -You can inject the `InteractiveQueryService` and use the method `getQueryableStore` to get values from a state store. +You can inject the api:configuration.kafka.streams.InteractiveQueryService[] and use the method `getQueryableStore(String storeName, QueryableStoreType storeType)` to get values from a state store. -[source,java] ----- -include::{kafkastreams}/InteractiveQueryService.java[tags=getQueryableStore, indent=0] ----- An example service that wraps the `InteractiveQueryService` is included below. This is here to illustrate that when calling the `getQueryableStore` method you must provide the store name and preferably the type of key and value you are trying to retrieve. diff --git a/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/streams/InteractiveQueryServiceExample.groovy b/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/streams/InteractiveQueryServiceExample.groovy index 4a77fcb59..44a2471da 100644 --- a/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/streams/InteractiveQueryServiceExample.groovy +++ b/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/streams/InteractiveQueryServiceExample.groovy @@ -31,8 +31,10 @@ class InteractiveQueryServiceExample { * @return the Long count of the word in the store */ Long getWordCount(String stateStore, String word) { - Optional> queryableStore = interactiveQueryService.getQueryableStore(stateStore, QueryableStoreTypes.keyValueStore()); - return queryableStore.map(kvReadOnlyKeyValueStore -> kvReadOnlyKeyValueStore.get(word)).orElse(0L); + Optional> queryableStore = interactiveQueryService.getQueryableStore( + stateStore, QueryableStoreTypes.keyValueStore()); + return queryableStore.map(kvReadOnlyKeyValueStore -> + kvReadOnlyKeyValueStore.get(word)).orElse(0L); } /** @@ -43,8 +45,10 @@ class InteractiveQueryServiceExample { * @return the byte[] stored in the state store */ byte[] getBytes(String stateStore, String blobName) { - Optional> queryableStore = interactiveQueryService.getQueryableStore(stateStore, QueryableStoreTypes.keyValueStore()); - return queryableStore.map(stringReadOnlyKeyValueStore -> stringReadOnlyKeyValueStore.get(blobName)).orElse(null); + Optional> queryableStore = interactiveQueryService.getQueryableStore( + stateStore, QueryableStoreTypes.keyValueStore()); + return queryableStore.map(stringReadOnlyKeyValueStore -> + stringReadOnlyKeyValueStore.get(blobName)).orElse(null); } /** @@ -55,8 +59,10 @@ class InteractiveQueryServiceExample { * @return the value of type V stored in the state store */ V getGenericKeyValue(String stateStore, K name) { - Optional> queryableStore = interactiveQueryService.getQueryableStore(stateStore, QueryableStoreTypes.keyValueStore()); - return queryableStore.map(kvReadOnlyKeyValueStore -> kvReadOnlyKeyValueStore.get(name)).orElse(null); + Optional> queryableStore = interactiveQueryService.getQueryableStore( + stateStore, QueryableStoreTypes.keyValueStore()); + return queryableStore.map(kvReadOnlyKeyValueStore -> + kvReadOnlyKeyValueStore.get(name)).orElse(null); } } // end::clazz[] diff --git a/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/streams/WordCountClient.groovy b/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/streams/WordCountClient.groovy index 0de2f9c35..c88a3e2b8 100644 --- a/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/streams/WordCountClient.groovy +++ b/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/streams/WordCountClient.groovy @@ -11,7 +11,7 @@ import io.micronaut.context.annotation.Requires @KafkaClient interface WordCountClient { - @Topic(WordCountStream.INPUT) + @Topic("streams-plaintext-input") void publishSentence(String sentence) } // end::clazz[] diff --git a/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/streams/WordCountListener.groovy b/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/streams/WordCountListener.groovy index 83b49c972..5af9ef690 100644 --- a/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/streams/WordCountListener.groovy +++ b/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/streams/WordCountListener.groovy @@ -18,7 +18,7 @@ class WordCountListener { private final Map wordCounts = new ConcurrentHashMap<>() - @Topic(WordCountStream.OUTPUT) + @Topic("streams-wordcount-output") void count(@KafkaKey String word, long count) { wordCounts.put(word, count) } diff --git a/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/streams/WordCountStream.groovy b/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/streams/WordCountStream.groovy index 9e67b982d..7f30a9039 100644 --- a/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/streams/WordCountStream.groovy +++ b/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/streams/WordCountStream.groovy @@ -1,6 +1,5 @@ package io.micronaut.kafka.docs.streams - // tag::imports[] import io.micronaut.configuration.kafka.streams.ConfiguredStreamBuilder import io.micronaut.context.annotation.Factory @@ -15,24 +14,18 @@ import org.apache.kafka.streams.kstream.KStream import org.apache.kafka.streams.kstream.KTable import org.apache.kafka.streams.kstream.Materialized import org.apache.kafka.streams.kstream.Produced - // end::imports[] @Requires(property = 'spec.name', value = 'WordCountStreamTest') // tag::clazz[] @Factory class WordCountStream { - - public static final String STREAM_WORD_COUNT = 'word-count' - public static final String INPUT = 'streams-plaintext-input' // <1> - public static final String OUTPUT = 'streams-wordcount-output' // <2> - public static final String WORD_COUNT_STORE = 'word-count-store' // end::clazz[] // tag::wordCountStream[] @Singleton - @Named(STREAM_WORD_COUNT) - KStream wordCountStream(ConfiguredStreamBuilder builder) { // <3> + @Named('word-count') + KStream wordCountStream(ConfiguredStreamBuilder builder) { // <1> // set default serdes Properties props = builder.getConfiguration(); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()) @@ -40,33 +33,28 @@ class WordCountStream { props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 'earliest') props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, '500') - KStream source = builder.stream(INPUT) + KStream source = builder.stream('streams-plaintext-input') // <2> KTable groupedByWord = source - .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+"))) - .groupBy((key, word) -> word, Grouped.with(Serdes.String(), Serdes.String())) - //Store the result in a store for lookup later - .count(Materialized.as(WORD_COUNT_STORE)) // <4> + .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+"))) + .groupBy((key, word) -> word, Grouped.with(Serdes.String(), Serdes.String())) + //Store the result in a store for lookup later + .count(Materialized.as('word-count-store')) // <3> groupedByWord - //convert to stream - .toStream() - //send to output using specific serdes - .to(OUTPUT, Produced.with(Serdes.String(), Serdes.Long())) + //convert to stream + .toStream() + //send to output using specific serdes + .to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long())) // <4> return source } // end::wordCountStream[] // tag::namedStream[] - public static final String MY_STREAM = 'my-stream' - public static final String NAMED_WORD_COUNT_INPUT = 'named-word-count-input' - public static final String NAMED_WORD_COUNT_OUTPUT = 'named-word-count-output' - @Singleton - @Named(MY_STREAM) - KStream myStream( - @Named(MY_STREAM) ConfiguredStreamBuilder builder) { + @Named('my-stream') + KStream myStream(@Named('my-stream') ConfiguredStreamBuilder builder) { // end::namedStream[] // set default serdes @@ -76,14 +64,22 @@ class WordCountStream { props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 'earliest') props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, '500') - KStream source = builder.stream(NAMED_WORD_COUNT_INPUT) + KStream source = builder.stream("named-word-count-input") KTable counts = source - .flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" "))) - .groupBy((key, value) -> value) - .count() + .flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" "))) + .groupBy((key, value) -> value) + .count() // need to override value serde to Long type - counts.toStream().to(NAMED_WORD_COUNT_OUTPUT, Produced.with(Serdes.String(), Serdes.Long())); + counts.toStream().to("named-word-count-output", Produced.with(Serdes.String(), Serdes.Long())); return source } + + // tag::myOtherStream[] + @Singleton + @Named('my-other-stream') + KStream myOtherKStream(@Named('my-other-stream') ConfiguredStreamBuilder builder) { + return builder.stream('my-other-stream') + } + // end::myOtherStream[] } diff --git a/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/streams/InteractiveQueryServiceExample.kt b/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/streams/InteractiveQueryServiceExample.kt index 23b30f15a..19dca8230 100644 --- a/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/streams/InteractiveQueryServiceExample.kt +++ b/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/streams/InteractiveQueryServiceExample.kt @@ -22,7 +22,7 @@ class InteractiveQueryServiceExample(private val interactiveQueryService: Intera * @param word the key to get, in this case the word as the stream and ktable have been grouped by word * @return the Long count of the word in the store */ - fun getWordCount(stateStore: String?, word: String): Long { + fun getWordCount(stateStore: String, word: String): Long { val queryableStore = interactiveQueryService.getQueryableStore( stateStore, QueryableStoreTypes.keyValueStore()) return queryableStore.map { kvReadOnlyKeyValueStore: ReadOnlyKeyValueStore -> @@ -36,7 +36,7 @@ class InteractiveQueryServiceExample(private val interactiveQueryService: Intera * @param blobName the key to get, in this case the name of the blob * @return the byte[] stored in the state store */ - fun getBytes(stateStore: String?, blobName: String): ByteArray? { + fun getBytes(stateStore: String, blobName: String): ByteArray? { val queryableStore = interactiveQueryService.getQueryableStore( stateStore, QueryableStoreTypes.keyValueStore()) return queryableStore.map { stringReadOnlyKeyValueStore: ReadOnlyKeyValueStore -> @@ -50,7 +50,7 @@ class InteractiveQueryServiceExample(private val interactiveQueryService: Intera * @param name the key to get * @return the value of type V stored in the state store */ - fun getGenericKeyValue(stateStore: String?, name: K): V? { + fun getGenericKeyValue(stateStore: String, name: K): V { val queryableStore = interactiveQueryService.getQueryableStore( stateStore, QueryableStoreTypes.keyValueStore()) return queryableStore.map { kvReadOnlyKeyValueStore: ReadOnlyKeyValueStore -> diff --git a/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/streams/WordCountClient.kt b/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/streams/WordCountClient.kt index b1f5e5c4b..9fdb40200 100644 --- a/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/streams/WordCountClient.kt +++ b/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/streams/WordCountClient.kt @@ -10,7 +10,8 @@ import io.micronaut.context.annotation.Requires // tag::clazz[] @KafkaClient interface WordCountClient { - @Topic(WordCountStream.INPUT) + + @Topic("streams-plaintext-input") fun publishSentence(sentence: String?) } // tag::clazz[] diff --git a/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/streams/WordCountListener.kt b/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/streams/WordCountListener.kt index 86e2ad795..6a183f930 100644 --- a/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/streams/WordCountListener.kt +++ b/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/streams/WordCountListener.kt @@ -16,7 +16,8 @@ import java.util.concurrent.ConcurrentHashMap class WordCountListener { private val wordCounts: MutableMap = ConcurrentHashMap() - @Topic(WordCountStream.OUTPUT) + + @Topic("streams-wordcount-output") fun count(@KafkaKey word: String, count: Long) { wordCounts[word] = count } diff --git a/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/streams/WordCountStream.kt b/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/streams/WordCountStream.kt index 4c56d699c..2de8ae40d 100644 --- a/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/streams/WordCountStream.kt +++ b/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/streams/WordCountStream.kt @@ -20,29 +20,19 @@ import java.util.* // tag::clazz[] @Factory class WordCountStream { - companion object { - const val STREAM_WORD_COUNT = "word-count" - const val INPUT = "streams-plaintext-input" // <1> - const val OUTPUT = "streams-wordcount-output" // <2> - const val WORD_COUNT_STORE = "word-count-store" - - const val MY_STREAM = "my-stream" - const val NAMED_WORD_COUNT_INPUT = "named-word-count-input" - const val NAMED_WORD_COUNT_OUTPUT = "named-word-count-output" - } // end::clazz[] // tag::wordCountStream[] @Singleton - @Named(STREAM_WORD_COUNT) - fun wordCountStream(builder: ConfiguredStreamBuilder): KStream { // <3> + @Named("word-count") + fun wordCountStream(builder: ConfiguredStreamBuilder): KStream { // <1> // set default serdes val props = builder.configuration props[StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG] = Serdes.String().javaClass.getName() props[StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG] = Serdes.String().javaClass.getName() props[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "earliest" props[StreamsConfig.COMMIT_INTERVAL_MS_CONFIG] = "500" - val source = builder.stream(INPUT) + val source = builder.stream("streams-plaintext-input") // <2> val groupedByWord = source .flatMapValues { value: String -> Arrays.asList( @@ -54,31 +44,29 @@ class WordCountStream { Grouped.with(Serdes.String(), Serdes.String()) ) //Store the result in a store for lookup later - .count(Materialized.`as`(WORD_COUNT_STORE)) // <4> + .count(Materialized.`as`("word-count-store")) // <3> groupedByWord //convert to stream .toStream() //send to output using specific serdes - .to(OUTPUT, Produced.with(Serdes.String(), Serdes.Long())) + .to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long())) // <4> return source } // end::wordCountStream[] // tag::namedStream[] @Singleton - @Named(MY_STREAM) - fun myStream( - @Named(MY_STREAM) builder: ConfiguredStreamBuilder - ): KStream { - + @Named("my-stream") + fun myStream(@Named("my-stream") builder: ConfiguredStreamBuilder): KStream { // end::namedStream[] + // set default serdes val props = builder.configuration props[StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG] = Serdes.String().javaClass.getName() props[StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG] = Serdes.String().javaClass.getName() props[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "earliest" props[StreamsConfig.COMMIT_INTERVAL_MS_CONFIG] = "500" - val source = builder.stream(NAMED_WORD_COUNT_INPUT) + val source = builder.stream("named-word-count-input") val counts = source .flatMapValues { value: String -> Arrays.asList( @@ -89,7 +77,15 @@ class WordCountStream { .count() // need to override value serde to Long type - counts.toStream().to(NAMED_WORD_COUNT_OUTPUT, Produced.with(Serdes.String(), Serdes.Long())) + counts.toStream().to("named-word-count-output", Produced.with(Serdes.String(), Serdes.Long())) return source } + + // tag::myOtherStream[] + @Singleton + @Named("my-other-stream") + fun myOtherKStream(@Named("my-other-stream") builder: ConfiguredStreamBuilder): KStream { + return builder.stream("my-other-stream") + } + // end::myOtherStream[] } diff --git a/test-suite/src/test/java/io/micronaut/kafka/docs/streams/InteractiveQueryServiceExample.java b/test-suite/src/test/java/io/micronaut/kafka/docs/streams/InteractiveQueryServiceExample.java index cfe649299..61f0caedb 100644 --- a/test-suite/src/test/java/io/micronaut/kafka/docs/streams/InteractiveQueryServiceExample.java +++ b/test-suite/src/test/java/io/micronaut/kafka/docs/streams/InteractiveQueryServiceExample.java @@ -32,8 +32,10 @@ public InteractiveQueryServiceExample(InteractiveQueryService interactiveQuerySe * @return the Long count of the word in the store */ public Long getWordCount(String stateStore, String word) { - Optional> queryableStore = interactiveQueryService.getQueryableStore(stateStore, QueryableStoreTypes.keyValueStore()); - return queryableStore.map(kvReadOnlyKeyValueStore -> kvReadOnlyKeyValueStore.get(word)).orElse(0L); + Optional> queryableStore = interactiveQueryService.getQueryableStore( + stateStore, QueryableStoreTypes.keyValueStore()); + return queryableStore.map(kvReadOnlyKeyValueStore -> + kvReadOnlyKeyValueStore.get(word)).orElse(0L); } /** @@ -44,8 +46,10 @@ public Long getWordCount(String stateStore, String word) { * @return the byte[] stored in the state store */ public byte[] getBytes(String stateStore, String blobName) { - Optional> queryableStore = interactiveQueryService.getQueryableStore(stateStore, QueryableStoreTypes.keyValueStore()); - return queryableStore.map(stringReadOnlyKeyValueStore -> stringReadOnlyKeyValueStore.get(blobName)).orElse(null); + Optional> queryableStore = interactiveQueryService.getQueryableStore( + stateStore, QueryableStoreTypes.keyValueStore()); + return queryableStore.map(stringReadOnlyKeyValueStore -> + stringReadOnlyKeyValueStore.get(blobName)).orElse(null); } /** @@ -56,8 +60,10 @@ public byte[] getBytes(String stateStore, String blobName) { * @return the value of type V stored in the state store */ public V getGenericKeyValue(String stateStore, K name) { - Optional> queryableStore = interactiveQueryService.getQueryableStore(stateStore, QueryableStoreTypes.keyValueStore()); - return queryableStore.map(kvReadOnlyKeyValueStore -> kvReadOnlyKeyValueStore.get(name)).orElse(null); + Optional> queryableStore = interactiveQueryService.getQueryableStore( + stateStore, QueryableStoreTypes.keyValueStore()); + return queryableStore.map(kvReadOnlyKeyValueStore -> + kvReadOnlyKeyValueStore.get(name)).orElse(null); } } // end::clazz[] diff --git a/test-suite/src/test/java/io/micronaut/kafka/docs/streams/WordCountClient.java b/test-suite/src/test/java/io/micronaut/kafka/docs/streams/WordCountClient.java index 74bca6eca..fdcf42acd 100644 --- a/test-suite/src/test/java/io/micronaut/kafka/docs/streams/WordCountClient.java +++ b/test-suite/src/test/java/io/micronaut/kafka/docs/streams/WordCountClient.java @@ -11,7 +11,7 @@ @KafkaClient public interface WordCountClient { - @Topic(WordCountStream.INPUT) + @Topic("streams-plaintext-input") void publishSentence(String sentence); } // end::clazz[] diff --git a/test-suite/src/test/java/io/micronaut/kafka/docs/streams/WordCountListener.java b/test-suite/src/test/java/io/micronaut/kafka/docs/streams/WordCountListener.java index 90bcc0799..768f03adb 100644 --- a/test-suite/src/test/java/io/micronaut/kafka/docs/streams/WordCountListener.java +++ b/test-suite/src/test/java/io/micronaut/kafka/docs/streams/WordCountListener.java @@ -20,7 +20,7 @@ public class WordCountListener { private final Map wordCounts = new ConcurrentHashMap<>(); - @Topic(WordCountStream.OUTPUT) + @Topic("streams-wordcount-output") void count(@KafkaKey String word, long count) { wordCounts.put(word, count); } diff --git a/test-suite/src/test/java/io/micronaut/kafka/docs/streams/WordCountStream.java b/test-suite/src/test/java/io/micronaut/kafka/docs/streams/WordCountStream.java index 2a8d175fe..9930ebd3e 100644 --- a/test-suite/src/test/java/io/micronaut/kafka/docs/streams/WordCountStream.java +++ b/test-suite/src/test/java/io/micronaut/kafka/docs/streams/WordCountStream.java @@ -24,18 +24,12 @@ // tag::clazz[] @Factory public class WordCountStream { - - public static final String STREAM_WORD_COUNT = "word-count"; - public static final String INPUT = "streams-plaintext-input"; // <1> - public static final String OUTPUT = "streams-wordcount-output"; // <2> - public static final String WORD_COUNT_STORE = "word-count-store"; - // end::clazz[] // tag::wordCountStream[] @Singleton - @Named(STREAM_WORD_COUNT) - KStream wordCountStream(ConfiguredStreamBuilder builder) { // <3> + @Named("word-count") + KStream wordCountStream(ConfiguredStreamBuilder builder) { // <1> // set default serdes Properties props = builder.getConfiguration(); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); @@ -43,33 +37,28 @@ KStream wordCountStream(ConfiguredStreamBuilder builder) { // <3 props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "500"); - KStream source = builder.stream(INPUT); + KStream source = builder.stream("streams-plaintext-input"); // <2> KTable groupedByWord = source - .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+"))) - .groupBy((key, word) -> word, Grouped.with(Serdes.String(), Serdes.String())) - //Store the result in a store for lookup later - .count(Materialized.as(WORD_COUNT_STORE)); // <4> + .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+"))) + .groupBy((key, word) -> word, Grouped.with(Serdes.String(), Serdes.String())) + //Store the result in a store for lookup later + .count(Materialized.as("word-count-store")); // <3> groupedByWord - //convert to stream - .toStream() - //send to output using specific serdes - .to(OUTPUT, Produced.with(Serdes.String(), Serdes.Long())); + //convert to stream + .toStream() + //send to output using specific serdes + .to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long())); // <4> return source; } // end::wordCountStream[] // tag::namedStream[] - public static final String MY_STREAM = "my-stream"; - public static final String NAMED_WORD_COUNT_INPUT = "named-word-count-input"; - public static final String NAMED_WORD_COUNT_OUTPUT = "named-word-count-output"; - @Singleton - @Named(MY_STREAM) - KStream myStream( - @Named(MY_STREAM) ConfiguredStreamBuilder builder) { + @Named("my-stream") + KStream myStream(@Named("my-stream") ConfiguredStreamBuilder builder) { // end::namedStream[] // set default serdes @@ -79,14 +68,22 @@ KStream myStream( props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "500"); - KStream source = builder.stream(NAMED_WORD_COUNT_INPUT); + KStream source = builder.stream("named-word-count-input"); KTable counts = source - .flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" "))) - .groupBy((key, value) -> value) - .count(); + .flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" "))) + .groupBy((key, value) -> value) + .count(); // need to override value serde to Long type - counts.toStream().to(NAMED_WORD_COUNT_OUTPUT, Produced.with(Serdes.String(), Serdes.Long())); + counts.toStream().to("named-word-count-output", Produced.with(Serdes.String(), Serdes.Long())); return source; } + + // tag::myOtherStream[] + @Singleton + @Named("my-other-stream") + KStream myOtherKStream(@Named("my-other-stream") ConfiguredStreamBuilder builder) { + return builder.stream("my-other-stream"); + } + // end::myOtherStream[] } From 542b02fc609b520aefe198ad4b52e2e1eea9cba0 Mon Sep 17 00:00:00 2001 From: Dean Wette Date: Fri, 25 Aug 2023 08:13:54 -0500 Subject: [PATCH 05/13] simplify display per original --- src/main/docs/guide/kafkaStreams.adoc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/docs/guide/kafkaStreams.adoc b/src/main/docs/guide/kafkaStreams.adoc index 163737c20..272f085f7 100644 --- a/src/main/docs/guide/kafkaStreams.adoc +++ b/src/main/docs/guide/kafkaStreams.adoc @@ -96,13 +96,13 @@ The above configuration sets the `num.stream.threads` setting of the Kafka `Stre You can then inject an `api:configuration.kafka.streams.ConfiguredStreamBuilder[]` specifically for the above configuration using `jakarta.inject.Named`: -snippet::io.micronaut.kafka.docs.streams.WordCountStream[tags="clazz, myOtherStream"] +snippet::io.micronaut.kafka.docs.streams.WordCountStream[tags="myOtherStream", indent=0] NOTE: If you do not provide a `@Named` on the `ConfiguredStreamBuilder` you have multiple KStreams defined that share the default configurations like client id, application id, etc.It is advisable when using multiple streams in a single app to provide a `@Named` instance of `ConfiguredStreamBuilder` for each stream. .Kafka Streams Word Count -snippet::io.micronaut.kafka.docs.streams.WordCountStream[tags="clazz, namedStream"] +snippet::io.micronaut.kafka.docs.streams.WordCountStream[tags="namedStream", indent=0] .Configuring Kafka Streams for testing When writing a test without starting the actual Kafka server, you can instruct Micronaut not to start Kafka Streams. To do this create a config file suffixed with an environment name, such as `application-test.yml` and set the `kafka.streams.[STREAM-NAME].start-kafka-streams` to `false`. From 9369857f3e5f7e8ab6fe20eac332314a5b79b9e0 Mon Sep 17 00:00:00 2001 From: Dean Wette Date: Fri, 25 Aug 2023 12:55:57 -0500 Subject: [PATCH 06/13] add test --- .../docs/streams/WordCountStreamTest.java | 23 +++++++++++++++++++ 1 file changed, 23 insertions(+) create mode 100644 test-suite/src/test/java/io/micronaut/kafka/docs/streams/WordCountStreamTest.java diff --git a/test-suite/src/test/java/io/micronaut/kafka/docs/streams/WordCountStreamTest.java b/test-suite/src/test/java/io/micronaut/kafka/docs/streams/WordCountStreamTest.java new file mode 100644 index 000000000..9ae7167bf --- /dev/null +++ b/test-suite/src/test/java/io/micronaut/kafka/docs/streams/WordCountStreamTest.java @@ -0,0 +1,23 @@ +package io.micronaut.kafka.docs.streams; + +import io.micronaut.context.ApplicationContext; +import org.junit.jupiter.api.Test; + +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; + +class WordCountStreamTest { + + @Test + void testWordCounter() { + try (ApplicationContext ctx = ApplicationContext.run( + Map.of("kafka.enabled", "true", "spec.name", "WordCountStreamTest") + )) { + assertDoesNotThrow(() -> { + WordCountClient client = ctx.getBean(WordCountClient.class); + client.publishSentence("Test to test for words"); + }); + } + } +} From 7827a19c2b2751fb74aad645b2a939617bf649ae Mon Sep 17 00:00:00 2001 From: Guillermo Calvo Date: Mon, 28 Aug 2023 15:46:41 +0200 Subject: [PATCH 07/13] Fix test beans --- .../java/io/micronaut/kafka/docs/streams/WordCountStream.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test-suite/src/test/java/io/micronaut/kafka/docs/streams/WordCountStream.java b/test-suite/src/test/java/io/micronaut/kafka/docs/streams/WordCountStream.java index 9930ebd3e..6b43501c6 100644 --- a/test-suite/src/test/java/io/micronaut/kafka/docs/streams/WordCountStream.java +++ b/test-suite/src/test/java/io/micronaut/kafka/docs/streams/WordCountStream.java @@ -58,7 +58,7 @@ KStream wordCountStream(ConfiguredStreamBuilder builder) { // <1 // tag::namedStream[] @Singleton @Named("my-stream") - KStream myStream(@Named("my-stream") ConfiguredStreamBuilder builder) { + KStream myStream(ConfiguredStreamBuilder builder) { // end::namedStream[] // set default serdes @@ -82,7 +82,7 @@ KStream myStream(@Named("my-stream") ConfiguredStreamBuilder bui // tag::myOtherStream[] @Singleton @Named("my-other-stream") - KStream myOtherKStream(@Named("my-other-stream") ConfiguredStreamBuilder builder) { + KStream myOtherKStream(ConfiguredStreamBuilder builder) { return builder.stream("my-other-stream"); } // end::myOtherStream[] From 4caeb151ca0a564647807425235b15dc783a038e Mon Sep 17 00:00:00 2001 From: Dean Wette Date: Mon, 28 Aug 2023 10:42:25 -0500 Subject: [PATCH 08/13] change test [skip ci] --- .../docs/streams/WordCountStreamTest.java | 20 +++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/test-suite/src/test/java/io/micronaut/kafka/docs/streams/WordCountStreamTest.java b/test-suite/src/test/java/io/micronaut/kafka/docs/streams/WordCountStreamTest.java index 9ae7167bf..f80b3eb11 100644 --- a/test-suite/src/test/java/io/micronaut/kafka/docs/streams/WordCountStreamTest.java +++ b/test-suite/src/test/java/io/micronaut/kafka/docs/streams/WordCountStreamTest.java @@ -1,23 +1,31 @@ package io.micronaut.kafka.docs.streams; import io.micronaut.context.ApplicationContext; +import io.micronaut.core.util.StringUtils; import org.junit.jupiter.api.Test; import java.util.Map; -import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.awaitility.Awaitility.await; class WordCountStreamTest { @Test void testWordCounter() { try (ApplicationContext ctx = ApplicationContext.run( - Map.of("kafka.enabled", "true", "spec.name", "WordCountStreamTest") + Map.of("kafka.enabled", StringUtils.TRUE, "spec.name", "WordCountStreamTest") )) { - assertDoesNotThrow(() -> { - WordCountClient client = ctx.getBean(WordCountClient.class); - client.publishSentence("Test to test for words"); - }); + WordCountClient client = ctx.getBean(WordCountClient.class); + client.publishSentence("test to test for words"); + WordCountListener listener = ctx.getBean(WordCountListener.class); + await().atMost(10, SECONDS).until(() -> + listener.getWordCounts().size() == 4 && + listener.getCount("test") == 2 && + listener.getCount("to") == 1 && + listener.getCount("for") == 1 && + listener.getCount("words") == 1 + ); } } } From 58e04f564c9ccad6a63c99169493337b383bd70f Mon Sep 17 00:00:00 2001 From: Guillermo Calvo Date: Mon, 28 Aug 2023 18:32:58 +0200 Subject: [PATCH 09/13] Create missing source topics for the kafka streams test --- .../docs/streams/KafkaTestInitializer.java | 64 +++++++++++++++++++ 1 file changed, 64 insertions(+) create mode 100644 test-suite/src/test/java/io/micronaut/kafka/docs/streams/KafkaTestInitializer.java diff --git a/test-suite/src/test/java/io/micronaut/kafka/docs/streams/KafkaTestInitializer.java b/test-suite/src/test/java/io/micronaut/kafka/docs/streams/KafkaTestInitializer.java new file mode 100644 index 000000000..d36b9637b --- /dev/null +++ b/test-suite/src/test/java/io/micronaut/kafka/docs/streams/KafkaTestInitializer.java @@ -0,0 +1,64 @@ +package io.micronaut.kafka.docs.streams; + +import io.micronaut.context.annotation.BootstrapContextCompatible; +import io.micronaut.context.annotation.Requires; +import io.micronaut.context.annotation.Value; +import io.micronaut.context.env.BootstrapPropertySourceLocator; +import io.micronaut.context.env.Environment; +import io.micronaut.context.env.PropertySource; +import io.micronaut.context.exceptions.ConfigurationException; +import jakarta.annotation.PostConstruct; +import jakarta.inject.Singleton; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.NewTopic; + +import java.util.Collections; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +@Requires(property = "spec.name", value = "WordCountStreamTest") +@BootstrapContextCompatible +@Singleton +public class KafkaTestInitializer implements BootstrapPropertySourceLocator { + + private final Map adminProps; + + public KafkaTestInitializer(@Value("${kafka.bootstrap.servers}") String bootstrapServers) { + this.adminProps = Collections.singletonMap(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + } + + @PostConstruct + void initializeTopics() { + createTopics( + Stream.of( + "streams-plaintext-input", + "named-word-count-input", + "my-other-stream", + "no-op-input" + ).map(topicName -> configureTopic(topicName, 1, 1)).collect(Collectors.toSet()) + ); + } + + @Override + public Iterable findPropertySources(Environment environment) throws ConfigurationException { + return BootstrapPropertySourceLocator.EMPTY_LOCATOR.findPropertySources(environment); + } + + private NewTopic configureTopic(String name, int numPartitions, int replicationFactor) { + return new NewTopic(name, numPartitions, (short) replicationFactor); + } + + private void createTopics(Set topicsToCreate) { + try (AdminClient admin = AdminClient.create(adminProps)) { + Set existingTopics = admin.listTopics().names().get(); + Set newTopics = topicsToCreate.stream().filter(newTopic -> !existingTopics.contains(newTopic.name())).collect(Collectors.toSet()); + admin.createTopics(newTopics).all().get(); + } catch (ExecutionException | InterruptedException e) { + throw new IllegalStateException("Failed to initialize test kafka topics", e); + } + } +} From 68390f85792b8e68592c99f4c0f3606fdb37590b Mon Sep 17 00:00:00 2001 From: Dean Wette Date: Mon, 28 Aug 2023 12:48:43 -0500 Subject: [PATCH 10/13] add tests --- .../docs/streams/KafkaTestInitializer.groovy | 62 +++++++++++++++++ .../kafka/docs/streams/WordCountStream.groovy | 4 +- .../docs/streams/WordCountStreamTest.groovy | 34 ++++++++++ .../docs/streams/KafkaTestInitializer.kt | 66 +++++++++++++++++++ .../kafka/docs/streams/WordCountStream.kt | 4 +- .../kafka/docs/streams/WordCountStreamTest.kt | 31 +++++++++ .../docs/streams/KafkaTestInitializer.java | 3 +- 7 files changed, 199 insertions(+), 5 deletions(-) create mode 100644 test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/streams/KafkaTestInitializer.groovy create mode 100644 test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/streams/WordCountStreamTest.groovy create mode 100644 test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/streams/KafkaTestInitializer.kt create mode 100644 test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/streams/WordCountStreamTest.kt diff --git a/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/streams/KafkaTestInitializer.groovy b/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/streams/KafkaTestInitializer.groovy new file mode 100644 index 000000000..059714492 --- /dev/null +++ b/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/streams/KafkaTestInitializer.groovy @@ -0,0 +1,62 @@ +package io.micronaut.kafka.docs.streams + +import io.micronaut.context.annotation.BootstrapContextCompatible +import io.micronaut.context.annotation.Requires +import io.micronaut.context.annotation.Value +import io.micronaut.context.env.BootstrapPropertySourceLocator +import io.micronaut.context.env.Environment +import io.micronaut.context.env.PropertySource +import io.micronaut.context.exceptions.ConfigurationException +import jakarta.annotation.PostConstruct +import jakarta.inject.Singleton +import org.apache.kafka.clients.admin.AdminClient +import org.apache.kafka.clients.admin.AdminClientConfig +import org.apache.kafka.clients.admin.NewTopic + +import java.util.concurrent.ExecutionException +import java.util.stream.Collectors +import java.util.stream.Stream + +@Requires(property = 'spec.name', value = 'WordCountStreamTest') +@BootstrapContextCompatible +@Singleton +class KafkaTestInitializer implements BootstrapPropertySourceLocator { + + private final Map adminProps + + KafkaTestInitializer(@Value('${kafka.bootstrap.servers}') String bootstrapServers) { + this.adminProps = Collections.singletonMap(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers) + } + + @PostConstruct + void initializeTopics() { + createTopics( + Stream.of( + 'streams-plaintext-input', + 'named-word-count-input', + 'my-other-stream', + 'no-op-input' + ).map(topicName -> configureTopic(topicName, 1, 1)).collect(Collectors.toSet()) + ) + } + + @Override + Iterable findPropertySources(Environment environment) throws ConfigurationException { + return BootstrapPropertySourceLocator.EMPTY_LOCATOR.findPropertySources(environment) + } + + private NewTopic configureTopic(String name, int numPartitions, int replicationFactor) { + return new NewTopic(name, numPartitions, (short) replicationFactor) + } + + private void createTopics(Set topicsToCreate) { + try (AdminClient admin = AdminClient.create(adminProps)) { + Set existingTopics = admin.listTopics().names().get() + Set newTopics = topicsToCreate.stream().filter(newTopic -> + !existingTopics.contains(newTopic.name())).collect(Collectors.toSet()); + admin.createTopics(newTopics).all().get() + } catch (ExecutionException | InterruptedException e) { + throw new IllegalStateException('Failed to initialize test kafka topics', e) + } + } +} diff --git a/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/streams/WordCountStream.groovy b/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/streams/WordCountStream.groovy index 7f30a9039..b73fdcf27 100644 --- a/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/streams/WordCountStream.groovy +++ b/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/streams/WordCountStream.groovy @@ -54,7 +54,7 @@ class WordCountStream { // tag::namedStream[] @Singleton @Named('my-stream') - KStream myStream(@Named('my-stream') ConfiguredStreamBuilder builder) { + KStream myStream(ConfiguredStreamBuilder builder) { // end::namedStream[] // set default serdes @@ -78,7 +78,7 @@ class WordCountStream { // tag::myOtherStream[] @Singleton @Named('my-other-stream') - KStream myOtherKStream(@Named('my-other-stream') ConfiguredStreamBuilder builder) { + KStream myOtherKStream(ConfiguredStreamBuilder builder) { return builder.stream('my-other-stream') } // end::myOtherStream[] diff --git a/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/streams/WordCountStreamTest.groovy b/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/streams/WordCountStreamTest.groovy new file mode 100644 index 000000000..b912189c7 --- /dev/null +++ b/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/streams/WordCountStreamTest.groovy @@ -0,0 +1,34 @@ +package io.micronaut.kafka.docs.streams + +import io.micronaut.context.ApplicationContext +import spock.lang.Specification +import spock.util.concurrent.PollingConditions + +class WordCountStreamTest extends Specification { + + PollingConditions conditions = new PollingConditions() + + void "test word counter"() { + given: + ApplicationContext ctx = ApplicationContext.run( + 'kafka.enabled': true, 'spec.name': 'WordCountStreamTest' + ) + + when: + WordCountClient client = ctx.getBean(WordCountClient) + client.publishSentence('test to test for words') + + then: + WordCountListener listener = ctx.getBean(WordCountListener) + conditions.within(10) { + listener.getWordCounts().size() == 4 && + listener.getCount('test') == 2 && + listener.getCount('to') == 1 && + listener.getCount('for') == 1 && + listener.getCount('words') == 1 + } + + cleanup: + ctx.close() + } +} diff --git a/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/streams/KafkaTestInitializer.kt b/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/streams/KafkaTestInitializer.kt new file mode 100644 index 000000000..da3dc2dba --- /dev/null +++ b/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/streams/KafkaTestInitializer.kt @@ -0,0 +1,66 @@ +package io.micronaut.kafka.docs.streams + +import io.micronaut.context.annotation.BootstrapContextCompatible +import io.micronaut.context.annotation.Requires +import io.micronaut.context.annotation.Value +import io.micronaut.context.env.BootstrapPropertySourceLocator +import io.micronaut.context.env.Environment +import io.micronaut.context.env.PropertySource +import io.micronaut.context.exceptions.ConfigurationException +import jakarta.annotation.PostConstruct +import jakarta.inject.Singleton +import org.apache.kafka.clients.admin.AdminClient +import org.apache.kafka.clients.admin.AdminClientConfig +import org.apache.kafka.clients.admin.NewTopic +import java.util.* +import java.util.concurrent.ExecutionException +import java.util.stream.Collectors +import java.util.stream.Stream + +@Requires(property = "spec.name", value = "WordCountStreamTest") +@BootstrapContextCompatible +@Singleton +class KafkaTestInitializer(@Value("\${kafka.bootstrap.servers}") bootstrapServers: String) : + BootstrapPropertySourceLocator { + private val adminProps: Map + + init { + adminProps = Collections.singletonMap(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers) + } + + @PostConstruct + fun initializeTopics() { + createTopics( + Stream.of( + "streams-plaintext-input", + "named-word-count-input", + "my-other-stream", + "no-op-input" + ).map { topicName: String -> configureTopic(topicName, 1, 1) }.collect(Collectors.toSet()) + ) + } + + @Throws(ConfigurationException::class) + override fun findPropertySources(environment: Environment): Iterable { + return BootstrapPropertySourceLocator.EMPTY_LOCATOR.findPropertySources(environment) + } + + private fun configureTopic(name: String, numPartitions: Int, replicationFactor: Int): NewTopic { + return NewTopic(name, numPartitions, replicationFactor.toShort()) + } + + private fun createTopics(topicsToCreate: Set) { + try { + AdminClient.create(adminProps).use { admin -> + val existingTopics = admin.listTopics().names().get() + val newTopics = topicsToCreate.stream().filter { newTopic: NewTopic -> + !existingTopics.contains(newTopic.name()) }.collect(Collectors.toSet()) + admin.createTopics(newTopics).all().get() + } + } catch (e: ExecutionException) { + throw IllegalStateException("Failed to initialize test kafka topics", e) + } catch (e: InterruptedException) { + throw IllegalStateException("Failed to initialize test kafka topics", e) + } + } +} diff --git a/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/streams/WordCountStream.kt b/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/streams/WordCountStream.kt index 2de8ae40d..f8886cf2b 100644 --- a/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/streams/WordCountStream.kt +++ b/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/streams/WordCountStream.kt @@ -57,7 +57,7 @@ class WordCountStream { // tag::namedStream[] @Singleton @Named("my-stream") - fun myStream(@Named("my-stream") builder: ConfiguredStreamBuilder): KStream { + fun myStream(builder: ConfiguredStreamBuilder): KStream { // end::namedStream[] // set default serdes @@ -84,7 +84,7 @@ class WordCountStream { // tag::myOtherStream[] @Singleton @Named("my-other-stream") - fun myOtherKStream(@Named("my-other-stream") builder: ConfiguredStreamBuilder): KStream { + fun myOtherKStream(builder: ConfiguredStreamBuilder): KStream { return builder.stream("my-other-stream") } // end::myOtherStream[] diff --git a/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/streams/WordCountStreamTest.kt b/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/streams/WordCountStreamTest.kt new file mode 100644 index 000000000..67ae76814 --- /dev/null +++ b/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/streams/WordCountStreamTest.kt @@ -0,0 +1,31 @@ +package io.micronaut.kafka.docs.streams + +import io.micronaut.context.ApplicationContext +import io.micronaut.core.util.StringUtils +import org.awaitility.Awaitility +import org.junit.jupiter.api.Test +import java.util.Map +import java.util.concurrent.TimeUnit + +internal class WordCountStreamTest { + + @Test + fun testWordCounter() { + ApplicationContext.run( + Map.of("kafka.enabled", StringUtils.TRUE, "spec.name", "WordCountStreamTest") + ).use { ctx -> + val client = ctx.getBean(WordCountClient::class.java) + client.publishSentence("test to test for words") + + val listener = ctx.getBean(WordCountListener::class.java) + + Awaitility.await().atMost(10, TimeUnit.SECONDS).until { + listener.getWordCounts().size == 4 && + listener.getCount("test") == 2L && + listener.getCount("to") == 1L && + listener.getCount("for") == 1L && + listener.getCount("words") == 1L + } + } + } +} diff --git a/test-suite/src/test/java/io/micronaut/kafka/docs/streams/KafkaTestInitializer.java b/test-suite/src/test/java/io/micronaut/kafka/docs/streams/KafkaTestInitializer.java index d36b9637b..0b468b0c4 100644 --- a/test-suite/src/test/java/io/micronaut/kafka/docs/streams/KafkaTestInitializer.java +++ b/test-suite/src/test/java/io/micronaut/kafka/docs/streams/KafkaTestInitializer.java @@ -55,7 +55,8 @@ private NewTopic configureTopic(String name, int numPartitions, int replicationF private void createTopics(Set topicsToCreate) { try (AdminClient admin = AdminClient.create(adminProps)) { Set existingTopics = admin.listTopics().names().get(); - Set newTopics = topicsToCreate.stream().filter(newTopic -> !existingTopics.contains(newTopic.name())).collect(Collectors.toSet()); + Set newTopics = topicsToCreate.stream().filter(newTopic -> + !existingTopics.contains(newTopic.name())).collect(Collectors.toSet()); admin.createTopics(newTopics).all().get(); } catch (ExecutionException | InterruptedException e) { throw new IllegalStateException("Failed to initialize test kafka topics", e); From 507a4cf3e75f48798cef51f7e1a25210eab1bc08 Mon Sep 17 00:00:00 2001 From: Dean Wette Date: Mon, 28 Aug 2023 15:12:39 -0500 Subject: [PATCH 11/13] cleanup kotlin test --- .../io/micronaut/kafka/docs/streams/WordCountStreamTest.kt | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/streams/WordCountStreamTest.kt b/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/streams/WordCountStreamTest.kt index 67ae76814..b7be9e011 100644 --- a/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/streams/WordCountStreamTest.kt +++ b/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/streams/WordCountStreamTest.kt @@ -2,9 +2,8 @@ package io.micronaut.kafka.docs.streams import io.micronaut.context.ApplicationContext import io.micronaut.core.util.StringUtils -import org.awaitility.Awaitility +import org.awaitility.Awaitility.await import org.junit.jupiter.api.Test -import java.util.Map import java.util.concurrent.TimeUnit internal class WordCountStreamTest { @@ -12,14 +11,14 @@ internal class WordCountStreamTest { @Test fun testWordCounter() { ApplicationContext.run( - Map.of("kafka.enabled", StringUtils.TRUE, "spec.name", "WordCountStreamTest") + mapOf("kafka.enabled" to StringUtils.TRUE, "spec.name" to "WordCountStreamTest") ).use { ctx -> val client = ctx.getBean(WordCountClient::class.java) client.publishSentence("test to test for words") val listener = ctx.getBean(WordCountListener::class.java) - Awaitility.await().atMost(10, TimeUnit.SECONDS).until { + await().atMost(10, TimeUnit.SECONDS).until { listener.getWordCounts().size == 4 && listener.getCount("test") == 2L && listener.getCount("to") == 1L && From beae51f1828b3d9301a13d9be9841fe200c555e8 Mon Sep 17 00:00:00 2001 From: Dean Wette Date: Mon, 28 Aug 2023 15:42:27 -0500 Subject: [PATCH 12/13] unique names for stream stores in test (hack to illustrate problem with repeatable tests). --- .../io/micronaut/kafka/docs/streams/WordCountStream.groovy | 2 +- .../kotlin/io/micronaut/kafka/docs/streams/WordCountStream.kt | 2 +- .../java/io/micronaut/kafka/docs/streams/WordCountStream.java | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/streams/WordCountStream.groovy b/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/streams/WordCountStream.groovy index b73fdcf27..eaba056e6 100644 --- a/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/streams/WordCountStream.groovy +++ b/test-suite-groovy/src/test/groovy/io/micronaut/kafka/docs/streams/WordCountStream.groovy @@ -39,7 +39,7 @@ class WordCountStream { .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+"))) .groupBy((key, word) -> word, Grouped.with(Serdes.String(), Serdes.String())) //Store the result in a store for lookup later - .count(Materialized.as('word-count-store')) // <3> + .count(Materialized.as('word-count-store-groovy')) // <3> groupedByWord //convert to stream diff --git a/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/streams/WordCountStream.kt b/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/streams/WordCountStream.kt index f8886cf2b..5b68aeb81 100644 --- a/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/streams/WordCountStream.kt +++ b/test-suite-kotlin/src/test/kotlin/io/micronaut/kafka/docs/streams/WordCountStream.kt @@ -44,7 +44,7 @@ class WordCountStream { Grouped.with(Serdes.String(), Serdes.String()) ) //Store the result in a store for lookup later - .count(Materialized.`as`("word-count-store")) // <3> + .count(Materialized.`as`("word-count-store-kotlin")) // <3> groupedByWord //convert to stream .toStream() diff --git a/test-suite/src/test/java/io/micronaut/kafka/docs/streams/WordCountStream.java b/test-suite/src/test/java/io/micronaut/kafka/docs/streams/WordCountStream.java index 6b43501c6..19c18fc47 100644 --- a/test-suite/src/test/java/io/micronaut/kafka/docs/streams/WordCountStream.java +++ b/test-suite/src/test/java/io/micronaut/kafka/docs/streams/WordCountStream.java @@ -43,7 +43,7 @@ KStream wordCountStream(ConfiguredStreamBuilder builder) { // <1 .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+"))) .groupBy((key, word) -> word, Grouped.with(Serdes.String(), Serdes.String())) //Store the result in a store for lookup later - .count(Materialized.as("word-count-store")); // <3> + .count(Materialized.as("word-count-store-java")); // <3> groupedByWord //convert to stream From 1eabe03be57ad54f2235119f01e25171cc01d5d5 Mon Sep 17 00:00:00 2001 From: Guillermo Calvo Date: Tue, 29 Aug 2023 11:54:18 +0200 Subject: [PATCH 13/13] Configure kafka streams so that they use different state directories --- test-suite-groovy/src/test/resources/application-test.properties | 1 + test-suite-kotlin/src/test/resources/application-test.properties | 1 + test-suite/src/test/resources/application-test.properties | 1 + 3 files changed, 3 insertions(+) create mode 100644 test-suite-groovy/src/test/resources/application-test.properties create mode 100644 test-suite-kotlin/src/test/resources/application-test.properties create mode 100644 test-suite/src/test/resources/application-test.properties diff --git a/test-suite-groovy/src/test/resources/application-test.properties b/test-suite-groovy/src/test/resources/application-test.properties new file mode 100644 index 000000000..6eee4b87c --- /dev/null +++ b/test-suite-groovy/src/test/resources/application-test.properties @@ -0,0 +1 @@ +micronaut.application.name=test-suite-groovy-${random.uuid} diff --git a/test-suite-kotlin/src/test/resources/application-test.properties b/test-suite-kotlin/src/test/resources/application-test.properties new file mode 100644 index 000000000..40708f310 --- /dev/null +++ b/test-suite-kotlin/src/test/resources/application-test.properties @@ -0,0 +1 @@ +micronaut.application.name=test-suite-kotlin-${random.uuid} diff --git a/test-suite/src/test/resources/application-test.properties b/test-suite/src/test/resources/application-test.properties new file mode 100644 index 000000000..30fbffed9 --- /dev/null +++ b/test-suite/src/test/resources/application-test.properties @@ -0,0 +1 @@ +micronaut.application.name=test-suite-java-${random.uuid}