diff --git a/docs/src/main/asciidoc/kafka-streams-guide.adoc b/docs/src/main/asciidoc/kafka-streams-guide.adoc index 31df9c67020412..145f72a916ca7b 100644 --- a/docs/src/main/asciidoc/kafka-streams-guide.adoc +++ b/docs/src/main/asciidoc/kafka-streams-guide.adoc @@ -129,7 +129,9 @@ public class ValuesGenerator { return Flowable.interval(500, TimeUnit.MILLISECONDS) // <2> .map(tick -> { WeatherStation station = stations.get(random.nextInt(stations.size())); - double temperature = new BigDecimal(random.nextGaussian() * 15 + station.averageTemperature) + double temperature = new BigDecimal( + random.nextGaussian() * 15 + station.averageTemperature + ) .setScale(1, RoundingMode.HALF_UP) .doubleValue(); @@ -220,13 +222,14 @@ package org.acme.quarkus.sample.kafkastreams.model; import io.quarkus.runtime.annotations.RegisterForReflection; -@RegisterForReflection +@RegisterForReflection // <1> public class WeatherStation { public int id; public String name; } ---- +<1> By adding the `@RegisterForReflection` annotation, it is ensured that this type can be instantiated reflectively when running the application in native mode. Then the file `aggregator/src/main/java/org/acme/quarkus/sample/kafkastreams/model/TemperatureMeasurement.java`, representing temperature measurements for a given station: @@ -265,7 +268,6 @@ import java.math.RoundingMode; import io.quarkus.runtime.annotations.RegisterForReflection; -@RegisterForReflection public class Aggregation { public int stationId; @@ -293,8 +295,6 @@ public class Aggregation { } ---- -All these types carry the `@RegisterForReflection`, allowing them to be instantiated reflectively when running the application in native mode. - Next, let's create the actual streaming query implementation itself in the `aggregator/src/main/java/org/acme/quarkus/sample/kafkastreams/streams/KafkaStreamsPipeline.java` file: [source, java] @@ -459,13 +459,6 @@ public class KafkaStreamsPipeline { <7> The results of the pipeline are written out to the `temperatures-aggregated` topic <8> The Quarkus `StartupEvent` and `ShutdownEvent` lifecycle events are used for starting and stopping the pipeline; as it may only be started once the topics exist, the Kafka admin client is used to check for their existence continuously, until they have been set up -Lastly, create the Quarkus configuration file `aggregator/src/main/resources/application.properties` with the following contents: - -[source] ----- -org.acme.quarkus.sample.kafkastreams.bootstrap.servers=localhost:9092 ----- - == Building and Running the Applications We now can build the `producer` and `aggregator` applications: @@ -661,9 +654,6 @@ which represents the actual aggregation result for a weather station: ---- package org.acme.quarkus.sample.kafkastreams.model; -import io.quarkus.runtime.annotations.RegisterForReflection; - -@RegisterForReflection public class WeatherStationData { public int stationId;