Skip to content

Commit

Permalink
#2663 Addressing review remarks;
Browse files Browse the repository at this point in the history
* Removing superfluous RegisterForReflection configuration
* Adding linebreak and comment
  • Loading branch information
gunnarmorling committed Jun 20, 2019
1 parent 42dd05f commit 32b42a6
Showing 1 changed file with 6 additions and 15 deletions.
21 changes: 6 additions & 15 deletions docs/src/main/asciidoc/kafka-streams-guide.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,9 @@ public class ValuesGenerator {
return Flowable.interval(500, TimeUnit.MILLISECONDS) // <2>
.map(tick -> {
WeatherStation station = stations.get(random.nextInt(stations.size()));
double temperature = new BigDecimal(random.nextGaussian() * 15 + station.averageTemperature)
double temperature = new BigDecimal(
random.nextGaussian() * 15 + station.averageTemperature
)
.setScale(1, RoundingMode.HALF_UP)
.doubleValue();
Expand Down Expand Up @@ -174,6 +176,7 @@ For that, add the following to the file `producer/src/main/resources/application

[source]
----
# Configure the Kafka broker location
kafka.bootstrap.servers=localhost:9092
mp.messaging.outgoing.temperature-values.connector=smallrye-kafka
Expand Down Expand Up @@ -220,13 +223,14 @@ package org.acme.quarkus.sample.kafkastreams.model;
import io.quarkus.runtime.annotations.RegisterForReflection;
@RegisterForReflection
@RegisterForReflection // <1>
public class WeatherStation {
public int id;
public String name;
}
----
<1> By adding the `@RegisterForReflection` annotation, it is ensured that this type can be instantiated reflectively when running the application in native mode.

Then the file `aggregator/src/main/java/org/acme/quarkus/sample/kafkastreams/model/TemperatureMeasurement.java`,
representing temperature measurements for a given station:
Expand Down Expand Up @@ -265,7 +269,6 @@ import java.math.RoundingMode;
import io.quarkus.runtime.annotations.RegisterForReflection;
@RegisterForReflection
public class Aggregation {
public int stationId;
Expand Down Expand Up @@ -293,8 +296,6 @@ public class Aggregation {
}
----

All these types carry the `@RegisterForReflection`, allowing them to be instantiated reflectively when running the application in native mode.

Next, let's create the actual streaming query implementation itself in the `aggregator/src/main/java/org/acme/quarkus/sample/kafkastreams/streams/KafkaStreamsPipeline.java` file:

[source, java]
Expand Down Expand Up @@ -459,13 +460,6 @@ public class KafkaStreamsPipeline {
<7> The results of the pipeline are written out to the `temperatures-aggregated` topic
<8> The Quarkus `StartupEvent` and `ShutdownEvent` lifecycle events are used for starting and stopping the pipeline; as it may only be started once the topics exist, the Kafka admin client is used to check for their existence continuously, until they have been set up

Lastly, create the Quarkus configuration file `aggregator/src/main/resources/application.properties` with the following contents:

[source]
----
org.acme.quarkus.sample.kafkastreams.bootstrap.servers=localhost:9092
----

== Building and Running the Applications

We now can build the `producer` and `aggregator` applications:
Expand Down Expand Up @@ -661,9 +655,6 @@ which represents the actual aggregation result for a weather station:
----
package org.acme.quarkus.sample.kafkastreams.model;
import io.quarkus.runtime.annotations.RegisterForReflection;
@RegisterForReflection
public class WeatherStationData {
public int stationId;
Expand Down

0 comments on commit 32b42a6

Please sign in to comment.