diff --git a/docs/src/main/asciidoc/images/kafka-streams-guide-architecture-distributed.png b/docs/src/main/asciidoc/images/kafka-streams-guide-architecture-distributed.png new file mode 100644 index 00000000000000..63a1d017021158 Binary files /dev/null and b/docs/src/main/asciidoc/images/kafka-streams-guide-architecture-distributed.png differ diff --git a/docs/src/main/asciidoc/images/kafka-streams-guide-architecture.png b/docs/src/main/asciidoc/images/kafka-streams-guide-architecture.png new file mode 100644 index 00000000000000..3f2140f6d620b0 Binary files /dev/null and b/docs/src/main/asciidoc/images/kafka-streams-guide-architecture.png differ diff --git a/docs/src/main/asciidoc/kafka-streams-guide.adoc b/docs/src/main/asciidoc/kafka-streams-guide.adoc new file mode 100644 index 00000000000000..c1e8a5f0cdde53 --- /dev/null +++ b/docs/src/main/asciidoc/kafka-streams-guide.adoc @@ -0,0 +1,1137 @@ +//// +This guide is maintained in the main Quarkus repository +and pull requests should be submitted there: +https://github.com/quarkusio/quarkus/tree/master/docs/src/main/asciidoc +//// += Quarkus - Using Apache Kafka Streams + +include::./attributes.adoc[] + +This guide demonstrates how your {project-name} application can utilize the Apache Kafka Streams API to implement stream processing applications based on Apache Kafka. + +== Prerequisites + +To complete this guide, you need: + +* less than 30 minutes +* an IDE +* JDK 1.8+ installed with `JAVA_HOME` configured appropriately +* Apache Maven 3.5.3+ +* Docker Compose to start an Apache Kafka development cluster +* GraalVM installed if you want to run in native mode. + +It is recommended, that you have read the {quickstarts-tree-url}/kafka-quickstart[Kafka quickstart] before. + +== Architecture + +In this guide, we are going to generate (random) temperature values in one component (named `generator`). +These values are associated to given weather stations and are written in a Kafka topic (`temperature-values`). +Another topic (`weather-stations`) contains just the master data about the weather stations themselves (id and name). + +A second component (`aggregator`) reads from the two Kafka topics and processes them in a streaming pipeline: + +* the two topics are joined on weather station id +* per weather station the min, max and average temperature is determined +* this aggregated data is written out to a third topic (`temperatures-aggregated`) + +The data can be examined by inspecting the output topic. +By exposing a Kafka Streams https://kafka.apache.org/22/documentation/streams/developer-guide/interactive-queries.html[interactive query], +the latest result for each weather station can alternatively be obtained via a simple REST query. + +The overall architecture looks like so: + +image::kafka-streams-guide-architecture.png[alt=Architecture] + +== Solution + +We recommend that you follow the instructions in the next sections and create the application step by step. +However, you can go right to the completed example. + +Clone the Git repository: `git clone {quickstarts-clone-url}`, or download an {quickstarts-archive-url}[archive]. + +The solution is located in the `kafka-quickstart` {quickstarts-tree-url}/kafka-streams-quickstart[directory]. + +== Creating the Producer Maven Project + +First, we need a new project with the temperature value producer. +Create a new project with the following command: + +[source, subs=attributes+] +---- +mkdir producer && cd producer && \ +mvn io.quarkus:quarkus-maven-plugin:{quarkus-version}:create \ + -DprojectGroupId=org.acme \ + -DprojectArtifactId=kafka-streams-quickstart-producer \ + -Dextensions="kafka" \ + && cd .. +---- + +This command generates a Maven project, importing the Reactive Messaging and Kafka connector extensions. + +=== The Temperature Value Producer + +Create the `producer/src/main/java/org/acme/quarkus/sample/generator/ValuesGenerator.java` file, +with the following content: + +[source, java] +---- +package org.acme.quarkus.sample.generator; + +import java.math.BigDecimal; +import java.math.RoundingMode; +import java.time.Instant; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Random; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import javax.enterprise.context.ApplicationScoped; + +import org.eclipse.microprofile.reactive.messaging.Outgoing; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.reactivex.Flowable; +import io.smallrye.reactive.messaging.kafka.KafkaMessage; + +/** + * A bean producing random temperature data every second. + * The values are written to a Kafka topic (temperature-values). + * Another topic contains the name of weather stations (weather-stations). + * The Kafka configuration is specified in the application configuration. + */ +@ApplicationScoped +public class ValuesGenerator { + + private static final Logger LOG = LoggerFactory.getLogger(ValuesGenerator.class); + + private Random random = new Random(); + + private List stations = Collections.unmodifiableList( + Arrays.asList( + new WeatherStation(1, "Hamburg", 13), + new WeatherStation(2, "Snowdonia", 5), + new WeatherStation(3, "Boston", 11), + new WeatherStation(4, "Tokio", 16), + new WeatherStation(5, "Cusco", 12), + new WeatherStation(6, "Svalbard", -7), + new WeatherStation(7, "Porthsmouth", 11), + new WeatherStation(8, "Oslo", 7), + new WeatherStation(9, "Marrakesh", 20) + )); + + + @Outgoing("temperature-values") // <1> + public Flowable> generate() { + + return Flowable.interval(500, TimeUnit.MILLISECONDS) // <2> + .map(tick -> { + WeatherStation station = stations.get(random.nextInt(stations.size())); + double temperature = new BigDecimal(random.nextGaussian() * 15 + station.averageTemperature) + .setScale(1, RoundingMode.HALF_UP) + .doubleValue(); + + LOG.info("station: {}, temperature: {}", station.name, temperature); + return KafkaMessage.of(station.id, Instant.now() + ";" + temperature); + }); + } + + @Outgoing("weather-stations") // <3> + public Flowable> weatherStations() { + List> stationsAsJson = stations.stream() + .map(s -> KafkaMessage.of(s.id, "{ \"id\" : " + s.id + ", \"name\" : \"" + s.name + "\" }")) + .collect(Collectors.toList()); + + return Flowable.fromIterable(stationsAsJson); + }; + + private static class WeatherStation { + + int id; + String name; + int averageTemperature; + + public WeatherStation(int id, String name, int averageTemperature) { + this.id = id; + this.name = name; + this.averageTemperature = averageTemperature; + } + } +} +---- +<1> Instruct Reactive Messaging to dispatch the items from the returned `Flowable` to `temperature-values`. +<2> The method returns a RX Java 2 _stream_ (`Flowable`) emitting a random temperature value every 0.5 seconds. +<3> Instruct Reactive Messaging to dispatch the items from the returned `Flowable` (static list of weather stations) to `weather-stations`. + +The two method each return a _reactive stream_ whose items are sent to the streams named `temperature-values` and `weather-stations`, respectively. + +=== Topic Configuration + +The two streams are mapped to Kafka Apache topics using the Quarkus configuration file `application.properties`. +For that, add the following to the file `producer/src/main/resources/application.properties`: + +[source] +---- +kafka.bootstrap.servers=localhost:9092 + +mp.messaging.outgoing.temperature-values.connector=smallrye-kafka +mp.messaging.outgoing.temperature-values.key.serializer=org.apache.kafka.common.serialization.IntegerSerializer +mp.messaging.outgoing.temperature-values.value.serializer=org.apache.kafka.common.serialization.StringSerializer + +mp.messaging.outgoing.weather-stations.connector=smallrye-kafka +mp.messaging.outgoing.weather-stations.key.serializer=org.apache.kafka.common.serialization.IntegerSerializer +mp.messaging.outgoing.weather-stations.value.serializer=org.apache.kafka.common.serialization.StringSerializer +---- + +This configures the Kafka bootstrap server, the two topics and the corresponding (de-)serializers. +More details about the different configuration options are available on the https://kafka.apache.org/documentation/#producerconfigs[Producer configuration] and https://kafka.apache.org/documentation/#consumerconfigs[Consumer configuration] section from the Kafka documentation. + +== Creating the Aggregator Maven Project + +With the producer application in place, it's time to implement the actual aggregator application, +which will run the Kafka Streams pipeline. +Create another project like so: + +[source, subs=attributes+] +---- +mkdir aggregator && cd aggregator && \ +mvn io.quarkus:quarkus-maven-plugin:{quarkus-version}:create \ + -DprojectGroupId=org.acme \ + -DprojectArtifactId=kafka-streams-quickstart-aggregator \ + -Dextensions="kafka-streams,resteasy-jsonb" \ + && cd .. +---- + +This creates the `aggregator, project with the Quarkus extension for Kafka Streams and with RESTEasy support for JSON-B. + +=== The Pipeline Implementation + +Let's begin the implementation of the stream processing application by creating +a few value objects for representing temperature measurements, weather stations and for keeping track of aggregated values. + +First, create the file `aggregator/src/main/java/org/acme/quarkus/sample/kafkastreams/model/WeatherStation.java`, +representing a weather station, with the following content: + +[source, java] +---- +package org.acme.quarkus.sample.kafkastreams.model; + +import io.quarkus.runtime.annotations.RegisterForReflection; + +@RegisterForReflection +public class WeatherStation { + + public int id; + public String name; +} +---- + +Then the file `aggregator/src/main/java/org/acme/quarkus/sample/kafkastreams/model/TemperatureMeasurement.java`, +representing temperature measurements for a given station: + +[source, java] +---- +package org.acme.quarkus.sample.kafkastreams.model; + +import java.time.Instant; + +public class TemperatureMeasurement { + + public int stationId; + public String stationName; + public Instant timestamp; + public double value; + + public TemperatureMeasurement(int stationId, String stationName, Instant timestamp, double value) { + this.stationId = stationId; + this.stationName = stationName; + this.timestamp = timestamp; + this.value = value; + } +} +---- + +And finally `aggregator/src/main/java/org/acme/quarkus/sample/kafkastreams/model/Aggregation.java`, +which will be used to keep track of the aggregated values while the events are processed in the streaming pipeline: + +[source, java] +---- +package org.acme.quarkus.sample.kafkastreams.model; + +import java.math.BigDecimal; +import java.math.RoundingMode; + +import io.quarkus.runtime.annotations.RegisterForReflection; + +@RegisterForReflection +public class Aggregation { + + public int stationId; + public String stationName; + public double min = Double.MAX_VALUE; + public double max = Double.MIN_VALUE; + public int count; + public double sum; + public double avg; + + public Aggregation updateFrom(TemperatureMeasurement measurement) { + stationId = measurement.stationId; + stationName = measurement.stationName; + + count++; + sum += measurement.value; + avg = BigDecimal.valueOf(sum / count) + .setScale(1, RoundingMode.HALF_UP).doubleValue(); + + min = Math.min(min, measurement.value); + max = Math.max(max, measurement.value); + + return this; + } +} +---- + +All these types carry the `@RegisterForReflection`, allowing them to be instantiated reflectively when running the application in native mode. + +Next, let's create the actual streaming query implementation itself in the `aggregator/src/main/java/org/acme/quarkus/sample/kafkastreams/streams/KafkaStreamsPipeline.java` file: + +[source, java] +---- +package org.acme.quarkus.sample.kafkastreams.streams; + +import java.time.Instant; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; + +import javax.enterprise.context.ApplicationScoped; +import javax.enterprise.event.Observes; + +import org.acme.quarkus.sample.kafkastreams.model.Aggregation; +import org.acme.quarkus.sample.kafkastreams.model.TemperatureMeasurement; +import org.acme.quarkus.sample.kafkastreams.model.WeatherStation; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.ListTopicsResult; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.errors.InvalidStateStoreException; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.GlobalKTable; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.Produced; +import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; +import org.apache.kafka.streams.state.QueryableStoreTypes; +import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; +import org.apache.kafka.streams.state.Stores; +import org.apache.kafka.streams.state.StreamsMetadata; +import org.eclipse.microprofile.config.inject.ConfigProperty; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.quarkus.runtime.ShutdownEvent; +import io.quarkus.runtime.StartupEvent; + +@ApplicationScoped +public class KafkaStreamsPipeline { + + private static final String WEATHER_STATIONS_STORE = "weather-stations-store"; + + private static final String WEATHER_STATIONS_TOPIC = "weather-stations"; + private static final String TEMPERATURE_VALUES_TOPIC = "temperature-values"; + private static final String TEMPERATURES_AGGREGATED_TOPIC = "temperatures-aggregated"; + + private static final Logger LOG = LoggerFactory.getLogger(KafkaStreamsPipeline.class); + + @ConfigProperty(name="org.acme.quarkus.sample.kafkastreams.bootstrap.servers", defaultValue="localhost:9092") + String bootstrapServers; + + @ConfigProperty(name="HOSTNAME") // <1> + String host; + + @ConfigProperty(name = "quarkus.http.port") // <1> + int port; + + private KafkaStreams streams; + + private ExecutorService executor; + + void onStart(@Observes StartupEvent ev) { + Properties props = new Properties(); + props.put(StreamsConfig.APPLICATION_ID_CONFIG, "temperature-aggregator"); + props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, host + ":" + port); // <1> + props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 * 1024); + props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); + props.put(CommonClientConfigs.METADATA_MAX_AGE_CONFIG, 500); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + StreamsBuilder builder = new StreamsBuilder(); + + JsonbSerde weatherStationSerde = new JsonbSerde<>(WeatherStation.class); + JsonbSerde aggregationSerde = new JsonbSerde<>(Aggregation.class); + + KeyValueBytesStoreSupplier storeSupplier = Stores.inMemoryKeyValueStore(WEATHER_STATIONS_STORE); + + GlobalKTable stations = builder.globalTable( // <2> + WEATHER_STATIONS_TOPIC, + Consumed.with(Serdes.Integer(), weatherStationSerde)); + + builder.stream( // <3> + TEMPERATURE_VALUES_TOPIC, + Consumed.with(Serdes.Integer(), Serdes.String()) + ) + .join( // <4> + stations, + (stationId, timestampAndValue) -> stationId, + (timestampAndValue, station) -> { + String[] parts = timestampAndValue.split(";"); + return new TemperatureMeasurement(station.id, station.name, Instant.parse(parts[0]), Double.valueOf(parts[1])); + } + ) + .groupByKey() // <5> + .aggregate( // <6> + Aggregation::new, + (stationId, value, aggregation) -> aggregation.updateFrom(value), + Materialized. as(storeSupplier) + .withKeySerde(Serdes.Integer()) + .withValueSerde(aggregationSerde) + ) + .toStream() + .to( // <7> + TEMPERATURES_AGGREGATED_TOPIC, + Produced.with(Serdes.Integer(), aggregationSerde) + ); + + streams = new KafkaStreams(builder.build(), props); + + executor = Executors.newSingleThreadExecutor(); + executor.execute(() -> { + waitForTopicsToBeCreated(bootstrapServers); // <8> + streams.start(); + }); + } + + void onStop(@Observes ShutdownEvent ev) { + streams.close(); + executor.shutdown(); + } + + private void waitForTopicsToBeCreated(String bootstrapServers) { + Map config = new HashMap<>(); + config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + + try (AdminClient adminClient = AdminClient.create(config)) { + while (true) { + ListTopicsResult topics = adminClient.listTopics(); + Set topicNames = topics.names().get(60, TimeUnit.SECONDS); + + if (topicNames.contains(WEATHER_STATIONS_TOPIC) && topicNames.contains(TEMPERATURE_VALUES_TOPIC)) { + return; + } + + LOG.info("Waiting for topics {} and {} to be created", WEATHER_STATIONS_TOPIC, TEMPERATURE_VALUES_TOPIC); + Thread.sleep(1_000); + } + } catch (InterruptedException | ExecutionException | TimeoutException e) { + throw new RuntimeException(e); + } + } +} +---- +<1> Host and port of the application are injected and passed to the Kafka Streams configuration as this application node's identity; this will be used later on when running the interactive query in a distributed way +<2> The `weather-stations` table is read into a `GlobalKTable`, representing the current state of each weather station +<3> The `temperature-values` topic is read into a `KStream`; whenever a new message arrives to this topic, the pipeline will be processed for this measurement +<4> The message from the `temperature-values` topic is joined with the corresponding weather station, using the topic's key (weather station id); the join result contains the data from the measurement and associated weather station message +<5> The values are grouped by message key (the weather station id) +<6> Within each group, all the measurements of that station are aggregated, by keeping track of minimum and maxium values and calculating the average value of all measurements of that station (see the `Aggregation` type) +<7> The results of the pipeline are written out to the `temperatures-aggregated` topic +<8> The Quarkus `StartupEvent` and `ShutdownEvent` lifecycle events are used for starting and stopping the pipeline; as it may only be started once the topics exist, the Kafka admin client is used to check for their existence continuously, until they have been set up + +Lastly, create the Quarkus configuration file `aggregator/src/main/resources/application.properties` with the following contents: + +[source] +---- +org.acme.quarkus.sample.kafkastreams.bootstrap.servers=localhost:9092 +---- + +== Building and Running the Applications + +We now can build the `producer` and `aggregator` applications: + +[source, subs=attributes+] +---- +mvn clean package -f producer/pom.xml +mvn clean package -f aggregator/pom.xml +---- + +Instead of running them directly on the host machine using the Quarkus dev mode, +we're going to package them into container images and launch them via Docker Compose. +This is done in order to demonstrate scaling the `aggregator` aggregation to multiple nodes later on. + +The `Dockerfile` created by Quarkus by default needs one adjustment for the `aggregator` application in order to run the Kafka Streams pipeline. +To do so, edit the file `aggregator/src/main/docker/Dockerfile.jvm` and replace the line `FROM fabric8/java-alpine-openjdk8-jre` with `FROM fabric8/java-centos-openjdk8-jdk`. + +Next create a Docker Compose file (`docker-compose.yaml`) for spinning up the two applications as well as Apache Kafka and ZooKeeper like so: + +[source, yaml] +---- +version: '3.5' + +services: + zookeeper: + image: strimzi/kafka:0.11.3-kafka-2.1.0 + command: [ + "sh", "-c", + "bin/zookeeper-server-start.sh config/zookeeper.properties" + ] + ports: + - "2181:2181" + environment: + LOG_DIR: /tmp/logs + networks: + - kafkastreams-network + kafka: + image: strimzi/kafka:0.11.3-kafka-2.1.0 + command: [ + "sh", "-c", + "bin/kafka-server-start.sh config/server.properties --override listeners=$${KAFKA_LISTENERS} --override advertised.listeners=$${KAFKA_ADVERTISED_LISTENERS} --override zookeeper.connect=$${KAFKA_ZOOKEEPER_CONNECT} --override num.partitions=$${KAFKA_NUM_PARTITIONS}" + ] + depends_on: + - zookeeper + ports: + - "9092:9092" + environment: + LOG_DIR: "/tmp/logs" + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092 + KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092 + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_NUM_PARTITIONS: 3 + networks: + - kafkastreams-network + + producer: + image: quarkus-quickstarts/kafka-streams-producer:1.0 + build: + context: producer + dockerfile: src/main/docker/Dockerfile.${QUARKUS_MODE:-jvm} + environment: + KAFKA_BOOTSTRAP_SERVERS: kafka:9092 + networks: + - kafkastreams-network + + aggregator: + image: quarkus-quickstarts/kafka-streams-aggregator:1.0 + build: + context: aggregator + dockerfile: src/main/docker/Dockerfile.${QUARKUS_MODE:-jvm} + environment: + ORG_ACME_QUARKUS_SAMPLE_KAFKASTREAMS_BOOTSTRAP_SERVERS: kafka:9092 + networks: + - kafkastreams-network + +networks: + kafkastreams-network: + name: ks +---- + +To launch all the containers, building the `producer` and `aggregator` container images, +run `docker-compose up --build`. + +You should see log statements from the `producer` application about messages being sent to the "temperature-values" topic. +Due to an [an issue](https://github.com/smallrye/smallrye-reactive-messaging/issues/128) in the SmallRye Reactive Messaging component it might be that the _producer_ application isn't sending events if it was started before Apache Kafka was up. +In this case, simply restart the `producer` +(run from another shell window, keeping the other services running): + +[source, subs=attributes+] +---- +docker-compose stop producer && docker-compose start producer +---- + +Now run an instance of the _debezium/tooling_ image, attaching to the same network all the other containers run in. +This image provides several useful tools such as _kafkacat_ and _httpie_: + +[source, subs=attributes+] +---- +docker run --tty --rm -i --network ks debezium/tooling:1.0 +---- + +Within the tooling container, run _kafkacat_ to examine the results of the streaming pipeline: + +[source, subs=attributes+] +---- +kafkacat -b kafka:9092 -C -o beginning -q -t temperatures-aggregated + +{"avg":34.7,"count":4,"max":49.4,"min":16.8,"stationId":9,"stationName":"Marrakesh","sum":138.8} +{"avg":15.7,"count":1,"max":15.7,"min":15.7,"stationId":2,"stationName":"Snowdonia","sum":15.7} +{"avg":12.8,"count":7,"max":25.5,"min":-13.8,"stationId":7,"stationName":"Porthsmouth","sum":89.7} +... +---- + +You should see new values arrive as the producer continues to emit temperature measurements, +each value on the outbound topic showing the mininum, maxium and average temperature values of the represented weather station. + +== Interactive Queries + +Subscribing to the `temperatures-aggregated` topic is a great way to react to any new temperature values. +It's a bit wasteful though if you're just interested in the latest aggregated value for a given weather station. +This is where Kafka Streams interactive queries shine: +they let you directly query the underlying state store of the pipeline for the value associated to a given key. +By exposing a simple REST endpoint which queries the state store, +the latest aggregation result can be retrieved without having to subscribe to any Kafka topic. + +Let's begin by adding one more method to the `KafkaStreamsPipeline` class which obtains the current state for a given key: + +[source, java] +---- +public GetWeatherStationDataResult getWeatherStationData(int id) { + Aggregation result = getWeatherStationStore().get(id); + + if (result != null) { + return GetWeatherStationDataResult.found(WeatherStationData.from(result)); // <1> + } + else { + return GetWeatherStationDataResult.notFound(); // <2> + } +} + +private ReadOnlyKeyValueStore getWeatherStationStore() { + while (true) { + try { + return streams.store(WEATHER_STATIONS_STORE, QueryableStoreTypes.keyValueStore()); + } catch (InvalidStateStoreException e) { + // ignore, store not ready yet + } + } +} +---- +<1> A value for the given station id was found, so that value will be returned +<2> No value was found, either because a non-existing station was queried or no measurement exists yet for the given station + +Also create the method's return type in the file `aggregator/src/main/java/org/acme/quarkus/sample/kafkastreams/streams/GetWeatherStationDataResult.java`: + +[source, java] +---- +package org.acme.quarkus.sample.kafkastreams.streams; + +import java.util.Optional; +import java.util.OptionalInt; + +import org.acme.quarkus.sample.kafkastreams.model.WeatherStationData; + +public class GetWeatherStationDataResult { + + private static GetWeatherStationDataResult NOT_FOUND = new GetWeatherStationDataResult(null); + + private final WeatherStationData result; + + private GetWeatherStationDataResult(WeatherStationData result) { + this.result = result; + } + + public static GetWeatherStationDataResult found(WeatherStationData data) { + return new GetWeatherStationDataResult(data); + } + + public static GetWeatherStationDataResult notFound() { + return NOT_FOUND; + } + + public Optional getResult() { + return Optional.ofNullable(result); + } +} +---- + +Also create `aggregator/src/main/java/org/acme/quarkus/sample/kafkastreams/model/WeatherStationData.java`, +which represents the actual aggregation result for a weather station: + +[source, java] +---- +package org.acme.quarkus.sample.kafkastreams.model; + +import io.quarkus.runtime.annotations.RegisterForReflection; + +@RegisterForReflection +public class WeatherStationData { + + public int stationId; + public String stationName; + public double min = Double.MAX_VALUE; + public double max = Double.MIN_VALUE; + public int count; + public double avg; + + private WeatherStationData(int stationId, String stationName, double min, double max, int count, double avg) { + this.stationId = stationId; + this.stationName = stationName; + this.min = min; + this.max = max; + this.count = count; + this.avg = avg; + } + + public static WeatherStationData from(Aggregation aggregation) { + return new WeatherStationData( + aggregation.stationId, + aggregation.stationName, + aggregation.min, + aggregation.max, + aggregation.count, + aggregation.avg); + } +} +---- + +We now can add a simple REST endpoint (`aggregator/src/main/java/org/acme/quarkus/sample/kafkastreams/rest/WeatherStationEndpoint.java`), +which invokes `getWeatherStationData()` and returns the data to the client: + +[source, java] +---- +package org.acme.quarkus.sample.kafkastreams.rest; + +import java.net.URI; +import java.net.URISyntaxException; +import java.util.List; + +import javax.enterprise.context.ApplicationScoped; +import javax.inject.Inject; +import javax.ws.rs.Consumes; +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.Response.Status; + +import org.acme.quarkus.sample.kafkastreams.streams.GetWeatherStationDataResult; +import org.acme.quarkus.sample.kafkastreams.streams.KafkaStreamsPipeline; + +@ApplicationScoped +@Path("/weather-stations") +public class WeatherStationEndpoint { + + @Inject + KafkaStreamsPipeline pipeline; + + @GET + @Path("/data/{id}") + @Consumes(MediaType.APPLICATION_JSON) + @Produces(MediaType.APPLICATION_JSON) + public Response getWeatherStationData(@PathParam("id") int id) { + GetWeatherStationDataResult result = pipeline.getWeatherStationData(id); + + if (result.getResult().isPresent()) { // <1> + return Response.ok(result.getResult().get()).build(); + } + else { + return Response.status(Status.NOT_FOUND.getStatusCode(), "No data found for weather station " + id).build(); + } + } +} +---- +<1> Depending on whether a value was obtained, either return that value or a 404 response + +With this code in place, it's time to rebuild the application and the `aggregator` service in Docker Compose: + +[source, subs=attributes+] +---- +mvn clean package -f aggregator/pom.xml +docker-compose stop aggregator +docker-compose up --build -d +---- + +This will rebuild the `aggregator` container and restart its service. +Once that's done, you can invoke the service's REST API to obtain the temperature data for one of the existing stations. +To do so, you can use `httpie` in the tooling container launched before: + +[source, subs=attributes+] +---- +http aggregator:8080/weather-stations/data/1 + +HTTP/1.1 200 OK +Connection: keep-alive +Content-Length: 85 +Content-Type: application/json +Date: Tue, 18 Jun 2019 19:29:16 GMT + +{ + "avg": 12.9, + "count": 146, + "max": 41.0, + "min": -25.6, + "stationId": 1, + "stationName": "Hamburg" +} +---- + +== Scaling Out + +A very interesting trait of Kafka Streams applications is that they can be scaled out, +i.e. the load and state can be distributed amongst multiple application instances running the same pipeline. +Each node will then contain a subset of the aggregation results, +but Kafka Streams provides you with https://kafka.apache.org/22/documentation/streams/developer-guide/interactive-queries.html#querying-remote-state-stores-for-the-entire-app[an API] to obtain the information which node is hosting a given key. +The application can then either fetch the data directly from the other instance, or simply point the client to the location of that other node. + +The `KafkaStreamsPipeline` class must be adjusted slightly for this distributed architecture: + +[source, java] +---- +public GetWeatherStationDataResult getWeatherStationData(int id) { + StreamsMetadata metadata = streams.metadataForKey( // <1> + WEATHER_STATIONS_STORE, + id, + Serdes.Integer().serializer() + ); + + if (metadata == null || metadata == StreamsMetadata.NOT_AVAILABLE) { + LOG.warn("Found no metadata for key {}", id); + return GetWeatherStationDataResult.notFound(); + } + else if (metadata.host().equals(host)) { // <2> + LOG.info("Found data for key {} locally", id); + Aggregation result = getWeatherStationStore().get(id); + + if (result != null) { + return GetWeatherStationDataResult.found(WeatherStationData.from(result)); + } + else { + return GetWeatherStationDataResult.notFound(); + } + } + else { // <3> + LOG.info("Found data for key {} on remote host {}:{}", id, metadata.host(), metadata.port()); + return GetWeatherStationDataResult.foundRemotely(metadata.host(), metadata.port()); + } +} + +public List getMetaData() { // <4> + return streams.allMetadataForStore(WEATHER_STATIONS_STORE) + .stream() + .map(m -> new PipelineMetadata( + m.hostInfo().host() + ":" + m.hostInfo().port(), + m.topicPartitions() + .stream() + .map(TopicPartition::toString) + .collect(Collectors.toSet())) + ) + .collect(Collectors.toList()); +} +---- +<1> The streams metadata for the given weather station id is obtained +<2> The given key (weather station id) is maintained by the local application node, i.e. it can answer the query itself +<3> The given key is maintained by another application node; in this case the information about that node (host and port) will be returned +<4> The `getMetaData()` method is added to provide callers with a list of all the nodes in the application cluster. + +The `GetWeatherStationDataResult` type must be adjusted accordingly: + +[source, java] +---- +package org.acme.quarkus.sample.kafkastreams.streams; + +import java.util.Optional; +import java.util.OptionalInt; + +import org.acme.quarkus.sample.kafkastreams.model.WeatherStationData; + +public class GetWeatherStationDataResult { + + private static GetWeatherStationDataResult NOT_FOUND = new GetWeatherStationDataResult(null, null, null); + + private final WeatherStationData result; + private final String host; + private final Integer port; + + private GetWeatherStationDataResult(WeatherStationData result, String host, Integer port) { + this.result = result; + this.host = host; + this.port = port; + } + + public static GetWeatherStationDataResult found(WeatherStationData data) { + return new GetWeatherStationDataResult(data, null, null); + } + + public static GetWeatherStationDataResult foundRemotely(String host, int port) { + return new GetWeatherStationDataResult(null, host, port); + } + + public static GetWeatherStationDataResult notFound() { + return NOT_FOUND; + } + + public Optional getResult() { + return Optional.ofNullable(result); + } + + public Optional getHost() { + return Optional.ofNullable(host); + } + + public OptionalInt getPort() { + return port != null ? OptionalInt.of(port) : OptionalInt.empty(); + } +} +---- + +Also the return type for `getMetaData()` must be defined +(`aggregator/src/main/java/org/acme/quarkus/sample/kafkastreams/streams/PipelineMetadata.java`) + +[source, java] +---- +package org.acme.quarkus.sample.kafkastreams.streams; + +import java.util.Set; + +public class PipelineMetadata { + + public String host; + public Set partitions; + + public PipelineMetadata(String host, Set partitions) { + this.host = host; + this.partitions = partitions; + } +} +---- + +Lastly, the REST endpoint class must be updated: + +[source, java] +---- +package org.acme.quarkus.sample.kafkastreams.rest; + +import java.net.URI; +import java.net.URISyntaxException; +import java.util.List; + +import javax.enterprise.context.ApplicationScoped; +import javax.inject.Inject; +import javax.ws.rs.Consumes; +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.Response.Status; + +import org.acme.quarkus.sample.kafkastreams.streams.GetWeatherStationDataResult; +import org.acme.quarkus.sample.kafkastreams.streams.KafkaStreamsPipeline; +import org.acme.quarkus.sample.kafkastreams.streams.PipelineMetadata; + +@ApplicationScoped +@Path("/weather-stations") +public class WeatherStationEndpoint { + + @Inject + KafkaStreamsPipeline pipeline; + + @GET + @Path("/data/{id}") + @Consumes(MediaType.APPLICATION_JSON) + @Produces(MediaType.APPLICATION_JSON) + public Response getWeatherStationData(@PathParam("id") int id) { + GetWeatherStationDataResult result = pipeline.getWeatherStationData(id); + + if (result.getResult().isPresent()) { // <1> + return Response.ok(result.getResult().get()).build(); + } + else if (result.getHost().isPresent()) { // <2> + URI otherUri = getOtherUri(result.getHost().get(), result.getPort().getAsInt(), id); + return Response.seeOther(otherUri).build(); + } + else { // <3> + return Response.status(Status.NOT_FOUND.getStatusCode(), "No data found for weather station " + id).build(); + } + } + + @GET + @Path("/meta-data") + @Produces(MediaType.APPLICATION_JSON) + public List getMetaData() { // <4> + return pipeline.getMetaData(); + } + + private URI getOtherUri(String host, int port, int id) { + try { + return new URI("http://" + host + ":" + port + "/weather-stations/data/" + id); + } + catch (URISyntaxException e) { + throw new RuntimeException(e); + } + } +} +---- +<1> The data was found locally, so return it +<2> The data is maintained by another node, so reply with a redirect (HTTP status code 303) if the data for the given key is stored on one of the other nodes. +<3> No data was found for the given weather station id +<4> Exposes information about all the hosts forming the application cluster + +Now stop the `aggregator` service again and rebuild it. +Then let's spin up three instances of it: + +[source, subs=attributes+] +---- +mvn clean package -f aggregator/pom.xml +docker-compose stop aggregator +docker-compose up --build -d --scale aggregator=3 +---- + +When invoking the REST API on any of the three instances, it might either be +that the aggregation for the requested weather station id is stored locally on the node receiving the query, +or it could be stored on one of the other two nodes. + +As the load balancer of Docker Compose will distribute requests to the `aggregator` service in a round-robin fashion, +we'll invoke the actual nodes directly. +The application exposes information about all the host names via REST: + +[source, subs=attributes+] +---- +http aggregator:8080/weather-stations/meta-data + +HTTP/1.1 200 OK +Connection: keep-alive +Content-Length: 202 +Content-Type: application/json +Date: Tue, 18 Jun 2019 20:00:23 GMT + +[ + { + "host": "2af13fe516a9:8080", + "partitions": [ + "temperature-values-2" + ] + }, + { + "host": "32cc8309611b:8080", + "partitions": [ + "temperature-values-1" + ] + }, + { + "host": "1eb39af8d587:8080", + "partitions": [ + "temperature-values-0" + ] + } +] +---- + +Retrieve the data from one of the three hosts shown in the response +(your actual host names will differ): + +[source, subs=attributes+] +---- +http 2af13fe516a9:8080/weather-stations/data/1 +---- + +If that node holds the data for key "1", you'll get a response like this: + +``` +HTTP/1.1 200 OK +Connection: keep-alive +Content-Length: 74 +Content-Type: application/json +Date: Tue, 11 Jun 2019 19:16:31 GMT + +{ + "avg": 11.9, + "count": 259, + "max": 50.0, + "min": -30.1, + "stationId": 1, + "stationName": "Hamburg" +} +``` + +Otherwise, the service will send a redirect: + +``` +HTTP/1.1 303 See Other +Connection: keep-alive +Content-Length: 0 +Date: Tue, 18 Jun 2019 20:01:03 GMT +Location: http://1eb39af8d587:8080/weather-stations/data/1 +``` + +You can also have _httpie_ automatically follow the redirect by passing the `--follow option`: + +```bash +http --follow 2af13fe516a9:8080/weather-stations/data/1 +``` + +== Running Native + +To run the `producer` and `aggregator` applications as native binaries via GraalVM, +the Maven builds can be run using the `native` profile. +As RocksDB, a dependency of Kafka Streams, uses JNI (the Java Native Interface), +one adjustment to the `pom.xml` file of the `aggregator` application is needed: +add the line `true` to the configuration of the `native-image` execution of the Maven Quarkus plug-in, +which then should look like so: + +[source,xml] +---- + + io.quarkus + quarkus-maven-plugin + ${quarkus.version} + + + + native-image + + + true + true + + + + +---- + +The build the two applications like so: + +[source, shell] +---- +mvn clean package -f producer/pom.xml -Pnative -Dnative-image.container-runtime=docker +mvn clean package -f aggregator/pom.xml -Pnative -Dnative-image.container-runtime=docker +---- + +Now create an environment variable named `QUARKUS_MODE` and with value set to "native": + +[source, shell] +---- +export QUARKUS_MODE=native +---- + +This is used by the Docker Compose file to use the correct `Dockerfile` when building the `producer` and `aggregator` images. +The Kafka Streams application can work with less than 50 MB RSS in native mode. +To do so, add the `Xmx` option to the program invocation in `aggregator/src/main/docker/Dockerfile.native`: + +[source, shell] +---- +CMD ["./application", "-Dquarkus.http.host=0.0.0.0", "-Xmx32m"] +---- + +Now start Docker Compose as described above +(don't forget to rebuild the container images). + +== Going Further + +This guide has shown how you can build stream processing applications using Quarkus and the Kafka Streams APIs, +both in JVM and native modes. +For running your KStreams application in production, you could also add health checks and metrics for the data pipeline. +Refer to the Quarkus guides on link:/guides/metrics-guide[metrics] and link:/guides/health-guide[health checks] to learn more.