diff --git a/docs/src/main/asciidoc/kafka-streams-guide.adoc b/docs/src/main/asciidoc/kafka-streams-guide.adoc index 31df9c67020412..0e14165508754a 100644 --- a/docs/src/main/asciidoc/kafka-streams-guide.adoc +++ b/docs/src/main/asciidoc/kafka-streams-guide.adoc @@ -129,7 +129,9 @@ public class ValuesGenerator { return Flowable.interval(500, TimeUnit.MILLISECONDS) // <2> .map(tick -> { WeatherStation station = stations.get(random.nextInt(stations.size())); - double temperature = new BigDecimal(random.nextGaussian() * 15 + station.averageTemperature) + double temperature = new BigDecimal( + random.nextGaussian() * 15 + station.averageTemperature + ) .setScale(1, RoundingMode.HALF_UP) .doubleValue(); @@ -141,7 +143,10 @@ public class ValuesGenerator { @Outgoing("weather-stations") // <3> public Flowable> weatherStations() { List> stationsAsJson = stations.stream() - .map(s -> KafkaMessage.of(s.id, "{ \"id\" : " + s.id + ", \"name\" : \"" + s.name + "\" }")) + .map(s -> KafkaMessage.of( + s.id, + "{ \"id\" : " + s.id + + ", \"name\" : \"" + s.name + "\" }")) .collect(Collectors.toList()); return Flowable.fromIterable(stationsAsJson); @@ -174,6 +179,7 @@ For that, add the following to the file `producer/src/main/resources/application [source] ---- +# Configure the Kafka broker location kafka.bootstrap.servers=localhost:9092 mp.messaging.outgoing.temperature-values.connector=smallrye-kafka @@ -220,13 +226,14 @@ package org.acme.quarkus.sample.kafkastreams.model; import io.quarkus.runtime.annotations.RegisterForReflection; -@RegisterForReflection +@RegisterForReflection // <1> public class WeatherStation { public int id; public String name; } ---- +<1> By adding the `@RegisterForReflection` annotation, it is ensured that this type can be instantiated reflectively when running the application in native mode. Then the file `aggregator/src/main/java/org/acme/quarkus/sample/kafkastreams/model/TemperatureMeasurement.java`, representing temperature measurements for a given station: @@ -244,7 +251,8 @@ public class TemperatureMeasurement { public Instant timestamp; public double value; - public TemperatureMeasurement(int stationId, String stationName, Instant timestamp, double value) { + public TemperatureMeasurement(int stationId, String stationName, Instant timestamp, + double value) { this.stationId = stationId; this.stationName = stationName; this.timestamp = timestamp; @@ -265,7 +273,6 @@ import java.math.RoundingMode; import io.quarkus.runtime.annotations.RegisterForReflection; -@RegisterForReflection public class Aggregation { public int stationId; @@ -293,8 +300,6 @@ public class Aggregation { } ---- -All these types carry the `@RegisterForReflection`, allowing them to be instantiated reflectively when running the application in native mode. - Next, let's create the actual streaming query implementation itself in the `aggregator/src/main/java/org/acme/quarkus/sample/kafkastreams/streams/KafkaStreamsPipeline.java` file: [source, java] @@ -355,7 +360,10 @@ public class KafkaStreamsPipeline { private static final Logger LOG = LoggerFactory.getLogger(KafkaStreamsPipeline.class); - @ConfigProperty(name="org.acme.quarkus.sample.kafkastreams.bootstrap.servers", defaultValue="localhost:9092") + @ConfigProperty( + name="org.acme.quarkus.sample.kafkastreams.bootstrap.servers", + defaultValue="localhost:9092" + ) String bootstrapServers; @ConfigProperty(name="HOSTNAME") // <1> @@ -379,10 +387,12 @@ public class KafkaStreamsPipeline { props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); StreamsBuilder builder = new StreamsBuilder(); - JsonbSerde weatherStationSerde = new JsonbSerde<>(WeatherStation.class); + JsonbSerde weatherStationSerde = new JsonbSerde<>( + WeatherStation.class); JsonbSerde aggregationSerde = new JsonbSerde<>(Aggregation.class); - KeyValueBytesStoreSupplier storeSupplier = Stores.inMemoryKeyValueStore(WEATHER_STATIONS_STORE); + KeyValueBytesStoreSupplier storeSupplier = Stores.inMemoryKeyValueStore( + WEATHER_STATIONS_STORE); GlobalKTable stations = builder.globalTable( // <2> WEATHER_STATIONS_TOPIC, @@ -397,7 +407,8 @@ public class KafkaStreamsPipeline { (stationId, timestampAndValue) -> stationId, (timestampAndValue, station) -> { String[] parts = timestampAndValue.split(";"); - return new TemperatureMeasurement(station.id, station.name, Instant.parse(parts[0]), Double.valueOf(parts[1])); + return new TemperatureMeasurement(station.id, station.name, + Instant.parse(parts[0]), Double.valueOf(parts[1])); } ) .groupByKey() // <5> @@ -437,11 +448,16 @@ public class KafkaStreamsPipeline { ListTopicsResult topics = adminClient.listTopics(); Set topicNames = topics.names().get(60, TimeUnit.SECONDS); - if (topicNames.contains(WEATHER_STATIONS_TOPIC) && topicNames.contains(TEMPERATURE_VALUES_TOPIC)) { + if (topicNames.contains(WEATHER_STATIONS_TOPIC) && + topicNames.contains(TEMPERATURE_VALUES_TOPIC)) { return; } - LOG.info("Waiting for topics {} and {} to be created", WEATHER_STATIONS_TOPIC, TEMPERATURE_VALUES_TOPIC); + LOG.info( + "Waiting for topics {} and {} to be created", + WEATHER_STATIONS_TOPIC, + TEMPERATURE_VALUES_TOPIC + ); Thread.sleep(1_000); } } catch (InterruptedException | ExecutionException | TimeoutException e) { @@ -459,13 +475,6 @@ public class KafkaStreamsPipeline { <7> The results of the pipeline are written out to the `temperatures-aggregated` topic <8> The Quarkus `StartupEvent` and `ShutdownEvent` lifecycle events are used for starting and stopping the pipeline; as it may only be started once the topics exist, the Kafka admin client is used to check for their existence continuously, until they have been set up -Lastly, create the Quarkus configuration file `aggregator/src/main/resources/application.properties` with the following contents: - -[source] ----- -org.acme.quarkus.sample.kafkastreams.bootstrap.servers=localhost:9092 ----- - == Building and Running the Applications We now can build the `producer` and `aggregator` applications: @@ -609,7 +618,8 @@ public GetWeatherStationDataResult getWeatherStationData(int id) { private ReadOnlyKeyValueStore getWeatherStationStore() { while (true) { try { - return streams.store(WEATHER_STATIONS_STORE, QueryableStoreTypes.keyValueStore()); + return streams.store(WEATHER_STATIONS_STORE, + QueryableStoreTypes.keyValueStore()); } catch (InvalidStateStoreException e) { // ignore, store not ready yet } @@ -632,7 +642,8 @@ import org.acme.quarkus.sample.kafkastreams.model.WeatherStationData; public class GetWeatherStationDataResult { - private static GetWeatherStationDataResult NOT_FOUND = new GetWeatherStationDataResult(null); + private static GetWeatherStationDataResult NOT_FOUND = + new GetWeatherStationDataResult(null); private final WeatherStationData result; @@ -661,9 +672,6 @@ which represents the actual aggregation result for a weather station: ---- package org.acme.quarkus.sample.kafkastreams.model; -import io.quarkus.runtime.annotations.RegisterForReflection; - -@RegisterForReflection public class WeatherStationData { public int stationId; @@ -673,7 +681,8 @@ public class WeatherStationData { public int count; public double avg; - private WeatherStationData(int stationId, String stationName, double min, double max, int count, double avg) { + private WeatherStationData(int stationId, String stationName, double min, double max, + int count, double avg) { this.stationId = stationId; this.stationName = stationName; this.min = min; @@ -737,7 +746,8 @@ public class WeatherStationEndpoint { return Response.ok(result.getResult().get()).build(); } else { - return Response.status(Status.NOT_FOUND.getStatusCode(), "No data found for weather station " + id).build(); + return Response.status(Status.NOT_FOUND.getStatusCode(), + "No data found for weather station " + id).build(); } } } @@ -816,7 +826,12 @@ public GetWeatherStationDataResult getWeatherStationData(int id) { } } else { // <3> - LOG.info("Found data for key {} on remote host {}:{}", id, metadata.host(), metadata.port()); + LOG.info( + "Found data for key {} on remote host {}:{}", + id, + metadata.host(), + metadata.port() + ); return GetWeatherStationDataResult.foundRemotely(metadata.host(), metadata.port()); } } @@ -852,13 +867,15 @@ import org.acme.quarkus.sample.kafkastreams.model.WeatherStationData; public class GetWeatherStationDataResult { - private static GetWeatherStationDataResult NOT_FOUND = new GetWeatherStationDataResult(null, null, null); + private static GetWeatherStationDataResult NOT_FOUND = + new GetWeatherStationDataResult(null, null, null); private final WeatherStationData result; private final String host; private final Integer port; - private GetWeatherStationDataResult(WeatherStationData result, String host, Integer port) { + private GetWeatherStationDataResult(WeatherStationData result, String host, + Integer port) { this.result = result; this.host = host; this.port = port; @@ -954,11 +971,13 @@ public class WeatherStationEndpoint { return Response.ok(result.getResult().get()).build(); } else if (result.getHost().isPresent()) { // <2> - URI otherUri = getOtherUri(result.getHost().get(), result.getPort().getAsInt(), id); + URI otherUri = getOtherUri(result.getHost().get(), result.getPort().getAsInt(), + id); return Response.seeOther(otherUri).build(); } else { // <3> - return Response.status(Status.NOT_FOUND.getStatusCode(), "No data found for weather station " + id).build(); + return Response.status(Status.NOT_FOUND.getStatusCode(), + "No data found for weather station " + id).build(); } }