Skip to content

Commit

Permalink
#2663 Addressing review remarks;
Browse files Browse the repository at this point in the history
* Removing superfluous RegisterForReflection configuration
* Adding linebreaks to avoid overflowing listings
* Adding onBackpressureDrop()
* Commenting config
  • Loading branch information
gunnarmorling committed Jun 20, 2019
1 parent 42dd05f commit 08051d1
Showing 1 changed file with 52 additions and 32 deletions.
84 changes: 52 additions & 32 deletions docs/src/main/asciidoc/kafka-streams-guide.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,12 @@ public class ValuesGenerator {
public Flowable<KafkaMessage<Integer, String>> generate() {
return Flowable.interval(500, TimeUnit.MILLISECONDS) // <2>
.onBackpressureDrop()
.map(tick -> {
WeatherStation station = stations.get(random.nextInt(stations.size()));
double temperature = new BigDecimal(random.nextGaussian() * 15 + station.averageTemperature)
double temperature = new BigDecimal(
random.nextGaussian() * 15 + station.averageTemperature
)
.setScale(1, RoundingMode.HALF_UP)
.doubleValue();
Expand All @@ -141,7 +144,10 @@ public class ValuesGenerator {
@Outgoing("weather-stations") // <3>
public Flowable<KafkaMessage<Integer, String>> weatherStations() {
List<KafkaMessage<Integer, String>> stationsAsJson = stations.stream()
.map(s -> KafkaMessage.of(s.id, "{ \"id\" : " + s.id + ", \"name\" : \"" + s.name + "\" }"))
.map(s -> KafkaMessage.of(
s.id,
"{ \"id\" : " + s.id +
", \"name\" : \"" + s.name + "\" }"))
.collect(Collectors.toList());
return Flowable.fromIterable(stationsAsJson);
Expand Down Expand Up @@ -174,6 +180,7 @@ For that, add the following to the file `producer/src/main/resources/application

[source]
----
# Configure the Kafka broker location
kafka.bootstrap.servers=localhost:9092
mp.messaging.outgoing.temperature-values.connector=smallrye-kafka
Expand Down Expand Up @@ -220,13 +227,14 @@ package org.acme.quarkus.sample.kafkastreams.model;
import io.quarkus.runtime.annotations.RegisterForReflection;
@RegisterForReflection
@RegisterForReflection // <1>
public class WeatherStation {
public int id;
public String name;
}
----
<1> By adding the `@RegisterForReflection` annotation, it is ensured that this type can be instantiated reflectively when running the application in native mode.

Then the file `aggregator/src/main/java/org/acme/quarkus/sample/kafkastreams/model/TemperatureMeasurement.java`,
representing temperature measurements for a given station:
Expand All @@ -244,7 +252,8 @@ public class TemperatureMeasurement {
public Instant timestamp;
public double value;
public TemperatureMeasurement(int stationId, String stationName, Instant timestamp, double value) {
public TemperatureMeasurement(int stationId, String stationName, Instant timestamp,
double value) {
this.stationId = stationId;
this.stationName = stationName;
this.timestamp = timestamp;
Expand All @@ -265,7 +274,6 @@ import java.math.RoundingMode;
import io.quarkus.runtime.annotations.RegisterForReflection;
@RegisterForReflection
public class Aggregation {
public int stationId;
Expand Down Expand Up @@ -293,8 +301,6 @@ public class Aggregation {
}
----

All these types carry the `@RegisterForReflection`, allowing them to be instantiated reflectively when running the application in native mode.

Next, let's create the actual streaming query implementation itself in the `aggregator/src/main/java/org/acme/quarkus/sample/kafkastreams/streams/KafkaStreamsPipeline.java` file:

[source, java]
Expand Down Expand Up @@ -355,7 +361,10 @@ public class KafkaStreamsPipeline {
private static final Logger LOG = LoggerFactory.getLogger(KafkaStreamsPipeline.class);
@ConfigProperty(name="org.acme.quarkus.sample.kafkastreams.bootstrap.servers", defaultValue="localhost:9092")
@ConfigProperty(
name="org.acme.quarkus.sample.kafkastreams.bootstrap.servers",
defaultValue="localhost:9092"
)
String bootstrapServers;
@ConfigProperty(name="HOSTNAME") // <1>
Expand All @@ -379,10 +388,12 @@ public class KafkaStreamsPipeline {
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
StreamsBuilder builder = new StreamsBuilder();
JsonbSerde<WeatherStation> weatherStationSerde = new JsonbSerde<>(WeatherStation.class);
JsonbSerde<WeatherStation> weatherStationSerde = new JsonbSerde<>(
WeatherStation.class);
JsonbSerde<Aggregation> aggregationSerde = new JsonbSerde<>(Aggregation.class);
KeyValueBytesStoreSupplier storeSupplier = Stores.inMemoryKeyValueStore(WEATHER_STATIONS_STORE);
KeyValueBytesStoreSupplier storeSupplier = Stores.inMemoryKeyValueStore(
WEATHER_STATIONS_STORE);
GlobalKTable<Integer, WeatherStation> stations = builder.globalTable( // <2>
WEATHER_STATIONS_TOPIC,
Expand All @@ -397,7 +408,8 @@ public class KafkaStreamsPipeline {
(stationId, timestampAndValue) -> stationId,
(timestampAndValue, station) -> {
String[] parts = timestampAndValue.split(";");
return new TemperatureMeasurement(station.id, station.name, Instant.parse(parts[0]), Double.valueOf(parts[1]));
return new TemperatureMeasurement(station.id, station.name,
Instant.parse(parts[0]), Double.valueOf(parts[1]));
}
)
.groupByKey() // <5>
Expand Down Expand Up @@ -437,11 +449,16 @@ public class KafkaStreamsPipeline {
ListTopicsResult topics = adminClient.listTopics();
Set<String> topicNames = topics.names().get(60, TimeUnit.SECONDS);
if (topicNames.contains(WEATHER_STATIONS_TOPIC) && topicNames.contains(TEMPERATURE_VALUES_TOPIC)) {
if (topicNames.contains(WEATHER_STATIONS_TOPIC) &&
topicNames.contains(TEMPERATURE_VALUES_TOPIC)) {
return;
}
LOG.info("Waiting for topics {} and {} to be created", WEATHER_STATIONS_TOPIC, TEMPERATURE_VALUES_TOPIC);
LOG.info(
"Waiting for topics {} and {} to be created",
WEATHER_STATIONS_TOPIC,
TEMPERATURE_VALUES_TOPIC
);
Thread.sleep(1_000);
}
} catch (InterruptedException | ExecutionException | TimeoutException e) {
Expand All @@ -459,13 +476,6 @@ public class KafkaStreamsPipeline {
<7> The results of the pipeline are written out to the `temperatures-aggregated` topic
<8> The Quarkus `StartupEvent` and `ShutdownEvent` lifecycle events are used for starting and stopping the pipeline; as it may only be started once the topics exist, the Kafka admin client is used to check for their existence continuously, until they have been set up

Lastly, create the Quarkus configuration file `aggregator/src/main/resources/application.properties` with the following contents:

[source]
----
org.acme.quarkus.sample.kafkastreams.bootstrap.servers=localhost:9092
----

== Building and Running the Applications

We now can build the `producer` and `aggregator` applications:
Expand Down Expand Up @@ -609,7 +619,8 @@ public GetWeatherStationDataResult getWeatherStationData(int id) {
private ReadOnlyKeyValueStore<Integer, Aggregation> getWeatherStationStore() {
while (true) {
try {
return streams.store(WEATHER_STATIONS_STORE, QueryableStoreTypes.keyValueStore());
return streams.store(WEATHER_STATIONS_STORE,
QueryableStoreTypes.keyValueStore());
} catch (InvalidStateStoreException e) {
// ignore, store not ready yet
}
Expand All @@ -632,7 +643,8 @@ import org.acme.quarkus.sample.kafkastreams.model.WeatherStationData;
public class GetWeatherStationDataResult {
private static GetWeatherStationDataResult NOT_FOUND = new GetWeatherStationDataResult(null);
private static GetWeatherStationDataResult NOT_FOUND =
new GetWeatherStationDataResult(null);
private final WeatherStationData result;
Expand Down Expand Up @@ -661,9 +673,6 @@ which represents the actual aggregation result for a weather station:
----
package org.acme.quarkus.sample.kafkastreams.model;
import io.quarkus.runtime.annotations.RegisterForReflection;
@RegisterForReflection
public class WeatherStationData {
public int stationId;
Expand All @@ -673,7 +682,8 @@ public class WeatherStationData {
public int count;
public double avg;
private WeatherStationData(int stationId, String stationName, double min, double max, int count, double avg) {
private WeatherStationData(int stationId, String stationName, double min, double max,
int count, double avg) {
this.stationId = stationId;
this.stationName = stationName;
this.min = min;
Expand Down Expand Up @@ -737,7 +747,8 @@ public class WeatherStationEndpoint {
return Response.ok(result.getResult().get()).build();
}
else {
return Response.status(Status.NOT_FOUND.getStatusCode(), "No data found for weather station " + id).build();
return Response.status(Status.NOT_FOUND.getStatusCode(),
"No data found for weather station " + id).build();
}
}
}
Expand Down Expand Up @@ -816,7 +827,12 @@ public GetWeatherStationDataResult getWeatherStationData(int id) {
}
}
else { // <3>
LOG.info("Found data for key {} on remote host {}:{}", id, metadata.host(), metadata.port());
LOG.info(
"Found data for key {} on remote host {}:{}",
id,
metadata.host(),
metadata.port()
);
return GetWeatherStationDataResult.foundRemotely(metadata.host(), metadata.port());
}
}
Expand Down Expand Up @@ -852,13 +868,15 @@ import org.acme.quarkus.sample.kafkastreams.model.WeatherStationData;
public class GetWeatherStationDataResult {
private static GetWeatherStationDataResult NOT_FOUND = new GetWeatherStationDataResult(null, null, null);
private static GetWeatherStationDataResult NOT_FOUND =
new GetWeatherStationDataResult(null, null, null);
private final WeatherStationData result;
private final String host;
private final Integer port;
private GetWeatherStationDataResult(WeatherStationData result, String host, Integer port) {
private GetWeatherStationDataResult(WeatherStationData result, String host,
Integer port) {
this.result = result;
this.host = host;
this.port = port;
Expand Down Expand Up @@ -954,11 +972,13 @@ public class WeatherStationEndpoint {
return Response.ok(result.getResult().get()).build();
}
else if (result.getHost().isPresent()) { // <2>
URI otherUri = getOtherUri(result.getHost().get(), result.getPort().getAsInt(), id);
URI otherUri = getOtherUri(result.getHost().get(), result.getPort().getAsInt(),
id);
return Response.seeOther(otherUri).build();
}
else { // <3>
return Response.status(Status.NOT_FOUND.getStatusCode(), "No data found for weather station " + id).build();
return Response.status(Status.NOT_FOUND.getStatusCode(),
"No data found for weather station " + id).build();
}
}
Expand Down

0 comments on commit 08051d1

Please sign in to comment.