From 32b42a65636c4a2f484a1f52ba1edc50c46e73de Mon Sep 17 00:00:00 2001 From: Gunnar Morling Date: Thu, 20 Jun 2019 09:19:12 +0200 Subject: [PATCH] #2663 Addressing review remarks; * Removing superfluous RegisterForReflection configuration * Adding linebreak and comment --- .../main/asciidoc/kafka-streams-guide.adoc | 21 ++++++------------- 1 file changed, 6 insertions(+), 15 deletions(-) diff --git a/docs/src/main/asciidoc/kafka-streams-guide.adoc b/docs/src/main/asciidoc/kafka-streams-guide.adoc index 31df9c67020412..df7a108c8667e2 100644 --- a/docs/src/main/asciidoc/kafka-streams-guide.adoc +++ b/docs/src/main/asciidoc/kafka-streams-guide.adoc @@ -129,7 +129,9 @@ public class ValuesGenerator { return Flowable.interval(500, TimeUnit.MILLISECONDS) // <2> .map(tick -> { WeatherStation station = stations.get(random.nextInt(stations.size())); - double temperature = new BigDecimal(random.nextGaussian() * 15 + station.averageTemperature) + double temperature = new BigDecimal( + random.nextGaussian() * 15 + station.averageTemperature + ) .setScale(1, RoundingMode.HALF_UP) .doubleValue(); @@ -174,6 +176,7 @@ For that, add the following to the file `producer/src/main/resources/application [source] ---- +# Configure the Kafka broker location kafka.bootstrap.servers=localhost:9092 mp.messaging.outgoing.temperature-values.connector=smallrye-kafka @@ -220,13 +223,14 @@ package org.acme.quarkus.sample.kafkastreams.model; import io.quarkus.runtime.annotations.RegisterForReflection; -@RegisterForReflection +@RegisterForReflection // <1> public class WeatherStation { public int id; public String name; } ---- +<1> By adding the `@RegisterForReflection` annotation, it is ensured that this type can be instantiated reflectively when running the application in native mode. Then the file `aggregator/src/main/java/org/acme/quarkus/sample/kafkastreams/model/TemperatureMeasurement.java`, representing temperature measurements for a given station: @@ -265,7 +269,6 @@ import java.math.RoundingMode; import io.quarkus.runtime.annotations.RegisterForReflection; -@RegisterForReflection public class Aggregation { public int stationId; @@ -293,8 +296,6 @@ public class Aggregation { } ---- -All these types carry the `@RegisterForReflection`, allowing them to be instantiated reflectively when running the application in native mode. - Next, let's create the actual streaming query implementation itself in the `aggregator/src/main/java/org/acme/quarkus/sample/kafkastreams/streams/KafkaStreamsPipeline.java` file: [source, java] @@ -459,13 +460,6 @@ public class KafkaStreamsPipeline { <7> The results of the pipeline are written out to the `temperatures-aggregated` topic <8> The Quarkus `StartupEvent` and `ShutdownEvent` lifecycle events are used for starting and stopping the pipeline; as it may only be started once the topics exist, the Kafka admin client is used to check for their existence continuously, until they have been set up -Lastly, create the Quarkus configuration file `aggregator/src/main/resources/application.properties` with the following contents: - -[source] ----- -org.acme.quarkus.sample.kafkastreams.bootstrap.servers=localhost:9092 ----- - == Building and Running the Applications We now can build the `producer` and `aggregator` applications: @@ -661,9 +655,6 @@ which represents the actual aggregation result for a weather station: ---- package org.acme.quarkus.sample.kafkastreams.model; -import io.quarkus.runtime.annotations.RegisterForReflection; - -@RegisterForReflection public class WeatherStationData { public int stationId;