diff --git a/docs/src/main/asciidoc/kafka-streams-guide.adoc b/docs/src/main/asciidoc/kafka-streams-guide.adoc new file mode 100644 index 00000000000000..e44b57e589347c --- /dev/null +++ b/docs/src/main/asciidoc/kafka-streams-guide.adoc @@ -0,0 +1,1178 @@ +//// +This guide is maintained in the main Quarkus repository +and pull requests should be submitted there: +https://github.com/quarkusio/quarkus/tree/master/docs/src/main/asciidoc +//// += Quarkus - Using Apache Kafka Streams + +include::./attributes.adoc[] + +This guide demonstrates how your {project-name} application can utilize the Apache Kafka Streams API to implement stream processing applications based on Apache Kafka. + +== Prerequisites + +To complete this guide, you need: + +* less than 15 minutes +* an IDE +* JDK 1.8+ installed with `JAVA_HOME` configured appropriately +* Apache Maven 3.5.3+ +* Docker Compose to start an Apache Kafka development cluster +* GraalVM installed if you want to run in native mode. + +It is recommended, that you have read the {quickstarts-tree-url}/kafka-quickstart[Kafka quickstart] before. + +== Architecture + +In this guide, we are going to generate (random) temperature values in one component. +These values are associated to given weather stations and are written in a Kafka topic (`temperature-values`). +Another topic (`weather-stations`) contains just the master data about the weather stations themselves (id and name). + +A second component reads from the two Kafka topics and processes them in a streaming pipeline: + +* the two topics are joined on weather station id +* per weather station the min, max and average temperature is determined +* this aggregated data is written out to a third topic (`temperatures-aggregated`) + +The data can be examined by inspecting the output topic. +By exposing a Kafka Streams interactive query, the latest result for each weather station can alternatively be obtained via a simple REST endpoint. + +image::kafka-streams-guide-architecture.png[alt=Architecture] + +== Solution + +We recommend that you follow the instructions in the next sections and create the application step by step. +However, you can go right to the completed example. + +Clone the Git repository: `git clone {quickstarts-clone-url}`, or download an {quickstarts-archive-url}[archive]. + +The solution is located in the `kafka-quickstart` {quickstarts-tree-url}/kafka-streams-quickstart[directory]. + +== Creating the Producer Maven Project + +First, we need a new project with the temperature value producer. +Create a new project with the following command: + +[source, subs=attributes+] +---- +mkdir producer && cd producer && \ +mvn io.quarkus:quarkus-maven-plugin:{quarkus-version}:create \ + -DprojectGroupId=org.acme \ + -DprojectArtifactId=kafka-streams-quickstart-producer \ + -Dextensions="reactive-kafka" \ + && cd .. +---- + +This command generates a Maven project, importing the Reactive Messaging and Kafka connector extensions. + +=== The Temperature Value Producer + +Create the `producer/src/main/java/org/acme/quarkus/sample/generator/ValuesGenerator.java` file, +with the following content: + +[source, java] +---- +package org.acme.quarkus.sample.generator; + +import java.math.BigDecimal; +import java.math.RoundingMode; +import java.time.Instant; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Random; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import javax.enterprise.context.ApplicationScoped; + +import org.eclipse.microprofile.reactive.messaging.Outgoing; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.reactivex.Flowable; +import io.smallrye.reactive.messaging.kafka.KafkaMessage; + +/** + * A bean producing random temperature data every second. + * The values are written to a Kafka topic (temperature-values). + * Another topic contains the name of weather stations (weather-stations). + * The Kafka configuration is specified in the application configuration. + */ +@ApplicationScoped +public class ValuesGenerator { + + private static final Logger LOG = LoggerFactory.getLogger(ValuesGenerator.class); + + private Random random = new Random(); + + private List stations = Collections.unmodifiableList( + Arrays.asList( + new WeatherStation(1, "Hamburg", 13), + new WeatherStation(2, "Snowdonia", 5), + new WeatherStation(3, "Boston", 11), + new WeatherStation(4, "Tokio", 16), + new WeatherStation(5, "Cusco", 12), + new WeatherStation(6, "Svalbard", -7), + new WeatherStation(7, "Porthsmouth", 11), + new WeatherStation(8, "Oslo", 7), + new WeatherStation(9, "Marrakesh", 20) + )); + + + @Outgoing("temperature-values") // <1> + public Flowable> generate() { + + return Flowable.interval(500, TimeUnit.MILLISECONDS) // <2> + .map(tick -> { + WeatherStation station = stations.get(random.nextInt(stations.size())); + double temperature = new BigDecimal(random.nextGaussian() * 15 + station.averageTemperature) + .setScale(1, RoundingMode.HALF_UP) + .doubleValue(); + + LOG.info("station: {}, temperature: {}", station.name, temperature); + return KafkaMessage.of(station.id, Instant.now() + ";" + temperature); + }); + } + + @Outgoing("weather-stations") // <3> + public Flowable> weatherStations() { + List> stationsAsJson = stations.stream() + .map(s -> KafkaMessage.of(s.id, "{ \"id\" : " + s.id + ", \"name\" : \"" + s.name + "\" }")) + .collect(Collectors.toList()); + + return Flowable.fromIterable(stationsAsJson); + }; + + private static class WeatherStation { + + int id; + String name; + int averageTemperature; + + public WeatherStation(int id, String name, int averageTemperature) { + this.id = id; + this.name = name; + this.averageTemperature = averageTemperature; + } + } +} +---- +<1> Instruct Reactive Messaging to dispatch the items from the returned `Flowable` to `temperature-values`. +<2> The method returns a RX Java 2 _stream_ (`Flowable`) emitting a random temperature value every 0.5 seconds. +<3> Instruct Reactive Messaging to dispatch the items from the returned `Flowable` (static list of weather stations) to `weather-stations`. + +The two method each return a _reactive stream_ whose items are sent to the streams named `temperature-values` and `weather-stations`, respectively. + +=== Topic Configuration + +The two streams are mapped to Kafka Apache topics using the Quarkus configuration file `application.properties`. +For that, create the file `producer/src/main/resources/application.properties` with the following contents: + +[source] +---- +smallrye.messaging.sink.temperature-values.type=io.smallrye.reactive.messaging.kafka.Kafka +smallrye.messaging.sink.temperature-values.topic=temperature-values +smallrye.messaging.sink.temperature-values.bootstrap.servers=localhost:9092 +smallrye.messaging.sink.temperature-values.key.serializer=org.apache.kafka.common.serialization.IntegerSerializer +smallrye.messaging.sink.temperature-values.value.serializer=org.apache.kafka.common.serialization.StringSerializer +smallrye.messaging.sink.temperature-values.acks=1 + +smallrye.messaging.sink.weather-stations.type=io.smallrye.reactive.messaging.kafka.Kafka +smallrye.messaging.sink.weather-stations.topic=weather-stations +smallrye.messaging.sink.weather-stations.bootstrap.servers=localhost:9092 +smallrye.messaging.sink.weather-stations.key.serializer=org.apache.kafka.common.serialization.IntegerSerializer +smallrye.messaging.sink.weather-stations.value.serializer=org.apache.kafka.common.serialization.StringSerializer +smallrye.messaging.sink.weather-stations.acks=1 +---- + +More details about the different configuration options is available on the https://kafka.apache.org/documentation/#producerconfigs[Producer configuration] and https://kafka.apache.org/documentation/#consumerconfigs[Consumer configuration] section from the Kafka documentation. + +== Creating the Aggregator Maven Project + +With the producer application in place, it's time to implement the actual aggregator application, +which will run the Kafka Streams pipeline. +Create another project like so: + +[source, subs=attributes+] +---- +mkdir aggregator && cd aggregator && \ +mvn io.quarkus:quarkus-maven-plugin:{quarkus-version}:create \ + -DprojectGroupId=org.acme \ + -DprojectArtifactId=kafka-streams-quickstart-aggregator \ + -Dextensions="kafka-streams,resteasy-jsonb" \ + && cd .. +---- + +This creates the "aggregator" project with the Quarkus extension for Kafka Streams and with RESTEasy support for JSON-B. + +=== The Pipeline Implementation + +Let's begin the implementation of the stream processing application by creating +a few value objects for representing temperature measurements, weather stations and for keeping track of aggregated values. + +First, create the file `aggregator/src/main/java/org/acme/quarkus/sample/kafkastreams/model/WeatherStation.java`, +with the following content: + +[source, java] +---- +package org.acme.quarkus.sample.kafkastreams.model; + +import io.quarkus.runtime.annotations.RegisterForReflection; + +@RegisterForReflection +public class WeatherStation { + + public int id; + public String name; +} +---- + +Then `aggregator/src/main/java/org/acme/quarkus/sample/kafkastreams/model/TemperatureMeasurement.java`: + +[source, java] +---- +package org.acme.quarkus.sample.kafkastreams.model; + +import java.time.Instant; + +public class TemperatureMeasurement { + + public int stationId; + public String stationName; + public Instant timestamp; + public double value; + + public TemperatureMeasurement(int stationId, String stationName, Instant timestamp, double value) { + this.stationId = stationId; + this.stationName = stationName; + this.timestamp = timestamp; + this.value = value; + } +} +---- + +And finally `aggregator/src/main/java/org/acme/quarkus/sample/kafkastreams/model/Aggregation.java`: + +[source, java] +---- +package org.acme.quarkus.sample.kafkastreams.model; + +import io.quarkus.runtime.annotations.RegisterForReflection; + +@RegisterForReflection +public class Aggregation { + + public int stationId; + public String stationName; + public double min = Double.MAX_VALUE; + public double max = Double.MIN_VALUE; + public int count; + public double sum; + + public Aggregation updateFrom(TemperatureMeasurement measurement) { + stationId = measurement.stationId; + stationName = measurement.stationName; + + count++; + sum += measurement.value; + + min = Math.min(min, measurement.value); + max = Math.max(max, measurement.value); + + return this; + } +} +---- + +All these types carry the `@RegisterForReflection`, allowing them to be instantiated reflectively when running the application in native mode. + +Next, let's create the actual streaming query implementation itself in the `aggregator/src/main/java/org/acme/quarkus/sample/kafkastreams/streams/KafkaStreamsPipeline.java` file: + +[source, java] +---- +package org.acme.quarkus.sample.kafkastreams.streams; + +import java.time.Instant; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; + +import javax.enterprise.context.ApplicationScoped; +import javax.enterprise.event.Observes; + +import org.acme.quarkus.sample.kafkastreams.model.Aggregation; +import org.acme.quarkus.sample.kafkastreams.model.TemperatureMeasurement; +import org.acme.quarkus.sample.kafkastreams.model.WeatherStation; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.ListTopicsResult; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.errors.InvalidStateStoreException; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.GlobalKTable; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.Produced; +import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; +import org.apache.kafka.streams.state.QueryableStoreTypes; +import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; +import org.apache.kafka.streams.state.Stores; +import org.apache.kafka.streams.state.StreamsMetadata; +import org.eclipse.microprofile.config.inject.ConfigProperty; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.quarkus.runtime.ShutdownEvent; +import io.quarkus.runtime.StartupEvent; + +@ApplicationScoped +public class KafkaStreamsPipeline { + + private static final String WEATHER_STATIONS_STORE = "weather-stations-store"; + + private static final String WEATHER_STATIONS_TOPIC = "weather-stations"; + private static final String TEMPERATURE_VALUES_TOPIC = "temperature-values"; + private static final String TEMPERATURES_AGGREGATED_TOPIC = "temperatures-aggregated"; + + private static final Logger LOG = LoggerFactory.getLogger(KafkaStreamsPipeline.class); + + @ConfigProperty(name="org.acme.quarkus.sample.kafkastreams.bootstrap.servers", defaultValue="localhost:9092") + String bootstrapServers; + + @ConfigProperty(name="HOSTNAME") + String host; + + @ConfigProperty(name = "quarkus.http.port") + int port; + + private KafkaStreams streams; + + private ExecutorService executor; + + void onStart(@Observes StartupEvent ev) { + Properties props = new Properties(); + props.put(StreamsConfig.APPLICATION_ID_CONFIG, "temperature-aggregator"); + props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, host + ":" + port); + props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 * 1024); + props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); + props.put(CommonClientConfigs.METADATA_MAX_AGE_CONFIG, 500); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + StreamsBuilder builder = new StreamsBuilder(); + + JsonbSerde weatherStationSerde = new JsonbSerde<>(WeatherStation.class); + JsonbSerde aggregationSerde = new JsonbSerde<>(Aggregation.class); + + KeyValueBytesStoreSupplier storeSupplier = Stores.inMemoryKeyValueStore(WEATHER_STATIONS_STORE); + + GlobalKTable stations = builder.globalTable( + WEATHER_STATIONS_TOPIC, + Consumed.with(Serdes.Integer(), weatherStationSerde)); + + builder.stream( + TEMPERATURE_VALUES_TOPIC, + Consumed.with(Serdes.Integer(), Serdes.String()) + ) + .join( + stations, + (stationId, timestampAndValue) -> stationId, + (timestampAndValue, station) -> { + String[] parts = timestampAndValue.split(";"); + return new TemperatureMeasurement(station.id, station.name, Instant.parse(parts[0]), Double.valueOf(parts[1])); + } + ) + .groupByKey() + .aggregate( + Aggregation::new, + (stationId, value, aggregtion) -> aggregtion.updateFrom(value), + Materialized. as(storeSupplier) + .withKeySerde(Serdes.Integer()) + .withValueSerde(aggregationSerde) + ) + .toStream() + .to( + TEMPERATURES_AGGREGATED_TOPIC, + Produced.with(Serdes.Integer(), aggregationSerde) + ); + + streams = new KafkaStreams(builder.build(), props); + + executor = Executors.newSingleThreadExecutor(); + executor.execute(() -> { + waitForTopicsToBeCreated(bootstrapServers); + streams.start(); + }); + } + + void onStop(@Observes ShutdownEvent ev) { + streams.close(); + executor.shutdown(); + } + + private void waitForTopicsToBeCreated(String bootstrapServers) { + Map config = new HashMap<>(); + config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + + try (AdminClient adminClient = AdminClient.create(config)) { + while (true) { + ListTopicsResult topics = adminClient.listTopics(); + Set topicNames = topics.names().get(60, TimeUnit.SECONDS); + + if (topicNames.contains(WEATHER_STATIONS_TOPIC) && topicNames.contains(TEMPERATURE_VALUES_TOPIC)) { + return; + } + + LOG.info("Waiting for topics {} and {} to be created", WEATHER_STATIONS_TOPIC, TEMPERATURE_VALUES_TOPIC); + Thread.sleep(1_000); + } + } catch (InterruptedException | ExecutionException | TimeoutException e) { + throw new RuntimeException(e); + } + } +} +---- + +The SerDe (serializer/deserializer) for storing and retrieving the different value types in/from Kafka topics +is defined in the file `aggregator/src/main/java/org/acme/quarkus/sample/kafkastreams/streams/JsonbSerde.java` and looks like so: + +[source, java] +---- +package org.acme.quarkus.sample.kafkastreams.streams; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.Map; + +import javax.json.bind.Jsonb; +import javax.json.bind.JsonbBuilder; + +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serializer; + +/** + * A {@link Serde} that (de-)serializes JSON. + */ +public class JsonbSerde implements Serde { + + private final Class type; + private final Jsonb jsonb = JsonbBuilder.create(); + + public JsonbSerde(Class type) { + this.type = type; + } + + @Override + public void configure(Map configs, boolean isKey) { + } + + @Override + public void close() { + try { + jsonb.close(); + } + catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public Serializer serializer() { + return new JsonSerializer(); + } + + @Override + public Deserializer deserializer() { + return new JsonDeserializer(type); + } + + private final class JsonDeserializer implements Deserializer { + + private final Class type; + + public JsonDeserializer(Class type) { + this.type = type; + } + + @Override + public void configure(Map configs, boolean isKey) { + } + + @Override + public U deserialize(String topic, byte[] data) { + if (data == null) { + return null; + } + + try (InputStream is = new ByteArrayInputStream(data)) { + return jsonb.fromJson(is, type); + } + catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public void close() { + } + } + + private final class JsonSerializer implements Serializer { + + @Override + public void configure(Map configs, boolean isKey) { + } + + @Override + public byte[] serialize(String topic, U data) { + try (ByteArrayOutputStream output = new ByteArrayOutputStream()) { + jsonb.toJson(data, output); + return output.toByteArray(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public void close() { + } + } +} +---- + +Lastly, create the Quarkus configuration file `aggregator/src/main/resources/application.properties` with the following contents: + +[source] +---- +org.acme.quarkus.sample.kafkastreams.bootstrap.servers=localhost:9092 +---- + +== Building and Running the Applications + +We now can build the `producer` and `aggregator` applications: + +[source, subs=attributes+] +---- +mvn clean package -f producer/pom.xml +mvn clean package -f aggregator/pom.xml +---- + +Instead of running them directly on the host machine using the Quarkus dev mode, +we're going to package them into container images and launch them via Docker Compose. +This is done in order to demonstrate scaling the `aggregator` aggregation to multiple nodes later on. + +The `Dockerfile` created by Quarkus by default needs one adjustment for the `aggregator` application in order to run the Kafka Streams pipeline. +To do so, Edit the file `aggregator/src/main/docker/Dockerfile.jvm` and replace the line `FROM fabric8/java-alpine-openjdk8-jre` with `FROM fabric8/java-centos-openjdk8-jdk`. + +Next create a Docker Compose file for spinning up the two applications as well as Apache Kafka and ZooKeeper like so: + +[source, yaml] +---- + version: '3.5' + + services: + zookeeper: + image: strimzi/kafka:0.11.3-kafka-2.1.0 + command: [ + "sh", "-c", + "bin/zookeeper-server-start.sh config/zookeeper.properties" + ] + ports: + - "2181:2181" + environment: + LOG_DIR: /tmp/logs + networks: + - kafkastreams-network + kafka: + image: strimzi/kafka:0.11.3-kafka-2.1.0 + command: [ + "sh", "-c", + "bin/kafka-server-start.sh config/server.properties --override listeners=$${KAFKA_LISTENERS} --override advertised.listeners=$${KAFKA_ADVERTISED_LISTENERS} --override zookeeper.connect=$${KAFKA_ZOOKEEPER_CONNECT} --override num.partitions=$${KAFKA_NUM_PARTITIONS}" + ] + depends_on: + - zookeeper + ports: + - "9092:9092" + environment: + LOG_DIR: "/tmp/logs" + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092 + KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092 + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_NUM_PARTITIONS: 3 + networks: + - kafkastreams-network + + producer: + image: quarkus-quickstarts/kafka-streams-producer:1.0 + build: + context: producer + dockerfile: src/main/docker/Dockerfile.${QUARKUS_MODE:-jvm} + environment: + SMALLRYE_MESSAGING_SINK_WEATHER_STATIONS_BOOTSTRAP_SERVERS: kafka:9092 + SMALLRYE_MESSAGING_SINK_TEMPERATURE_VALUES_BOOTSTRAP_SERVERS: kafka:9092 + networks: + - kafkastreams-network + + aggregator: + image: quarkus-quickstarts/kafka-streams-aggregator:1.0 + build: + context: aggregator + dockerfile: src/main/docker/Dockerfile.${QUARKUS_MODE:-jvm} + environment: + ORG_ACME_QUARKUS_SAMPLE_KAFKASTREAMS_BOOTSTRAP_SERVERS: kafka:9092 + networks: + - kafkastreams-network + + networks: + kafkastreams-network: + name: ks +---- + +To launch Kafka, ZooKeeper and the `aggregator` application, run `docker-compose up --build --scale producer=0`. + +Once all containers are up, launch the `producer` application +(this deferred start is currently needed due to [an issue](https://github.com/smallrye/smallrye-reactive-messaging/issues/128)) +in the SmallRye Reactive Messaging component) via `docker-compose up -d`. + +You should see log statements about messages being sent to the "temperature-values" topic. +Now run an instance of the _debezium/tooling_ image which comes with several useful tools such as _kafkacat_ and _httpie_: + +[source, subs=attributes+] +---- +docker run --tty --rm -i --network ks debezium/tooling:1.0 +---- + +Within the tooling container, run _kafkacat_ to examine the results of the streaming pipeline: + +[source, subs=attributes+] +---- +kafkacat -b kafka:9092 -C -o beginning -q -t temperatures-aggregated +---- + +You should see new values arrive as the producer continues to emit temperature measurements, +each value on the outbound topic showing the mininum, maxium and average temperature values of the represented weather station. + +== Interactive Queries + +Subscribing to the `temperatures-aggregated` topic is a great way to react to any new temperature values. +It's a bit wasteful though if you're interested just in the latest aggregated value for a given weather station. +this is where Kafka Streams interactive queries shine: +they let you directly query the underlying state store of the pipeline for the value associated to a given key. +By exposing a simple REST endpoint which queries the state store, +the latest aggregation result can be retrieved without having to subscribe to any Kafka topic. + +Let's begin by adding one more method to the `KafkaStreamsPipeline` class which obtains the current state for a given key: + +[source, java] +---- +public GetWeatherStationDataResult getWeatherStationData(int id) { + Aggregation result = getWeatherStationStore().get(id); + + if (result != null) { + return GetWeatherStationDataResult.found(WeatherStationData.from(result)); + } + else { + return GetWeatherStationDataResult.notFound(); + } +} + +private ReadOnlyKeyValueStore getWeatherStationStore() { + while (true) { + try { + return streams.store(WEATHER_STATIONS_STORE, QueryableStoreTypes.keyValueStore()); + } catch (InvalidStateStoreException e) { + // ignore, store not ready yet + } + } +} +---- + +Also create the method's return type in the file `aggregator/src/main/java/org/acme/quarkus/sample/kafkastreams/streams/GetWeatherStationDataResult.java`: + +[source, java] +---- +package org.acme.quarkus.sample.kafkastreams.streams; + +import java.util.Optional; +import java.util.OptionalInt; + +import org.acme.quarkus.sample.kafkastreams.model.WeatherStationData; + +public class GetWeatherStationDataResult { + + private static GetWeatherStationDataResult NOT_FOUND = new GetWeatherStationDataResult(null); + + private final WeatherStationData result; + + private GetWeatherStationDataResult(WeatherStationData result) { + this.result = result; + } + + public static GetWeatherStationDataResult found(WeatherStationData data) { + return new GetWeatherStationDataResult(data); + } + + public static GetWeatherStationDataResult notFound() { + return NOT_FOUND; + } + + public Optional getResult() { + return Optional.ofNullable(result); + } +} +---- + +And `aggregator/src/main/java/org/acme/quarkus/sample/kafkastreams/model/WeatherStationData.java`, +which represents the actual aggregation result for a weather station: + +[source, java] +---- +package org.acme.quarkus.sample.kafkastreams.model; + +import java.math.BigDecimal; +import java.math.RoundingMode; + +import io.quarkus.runtime.annotations.RegisterForReflection; + +@RegisterForReflection +public class WeatherStationData { + + public int stationId; + public String stationName; + public double min = Double.MAX_VALUE; + public double max = Double.MIN_VALUE; + public int count; + public double avg; + + private WeatherStationData(int stationId, String stationName, double min, double max, int count, double avg) { + this.stationId = stationId; + this.stationName = stationName; + this.min = min; + this.max = max; + this.count = count; + this.avg = avg; + } + + public static WeatherStationData from(Aggregation aggregation) { + double avg = BigDecimal.valueOf(aggregation.sum / aggregation.count) + .setScale(1, RoundingMode.HALF_UP).doubleValue(); + + return new WeatherStationData( + aggregation.stationId, + aggregation.stationName, + aggregation.min, + aggregation.max, + aggregation.count, + avg); + } +} +---- + +We now can add a simple REST endpoint (`aggregator/src/main/java/org/acme/quarkus/sample/kafkastreams/rest/WeatherStationEndpoint.java`), +which invokes `getWeatherStationData()` and returns the data to the client: + +[source, java] +---- +package org.acme.quarkus.sample.kafkastreams.rest; + +import java.net.URI; +import java.net.URISyntaxException; +import java.util.List; + +import javax.enterprise.context.ApplicationScoped; +import javax.inject.Inject; +import javax.ws.rs.Consumes; +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.Response.Status; + +import org.acme.quarkus.sample.kafkastreams.streams.GetWeatherStationDataResult; +import org.acme.quarkus.sample.kafkastreams.streams.KafkaStreamsPipeline; + +@ApplicationScoped +@Path("/weather-stations") +public class WeatherStationEndpoint { + + @Inject + KafkaStreamsPipeline pipeline; + + @GET + @Path("/data/{id}") + @Consumes(MediaType.APPLICATION_JSON) + @Produces(MediaType.APPLICATION_JSON) + public Response getWeatherStationData(@PathParam("id") int id) { + GetWeatherStationDataResult result = pipeline.getWeatherStationData(id); + + if (result.getResult().isPresent()) { + return Response.ok(result.getResult().get()).build(); + } + else { + return Response.status(Status.NOT_FOUND.getStatusCode(), "No data found for weather station " + id).build(); + } + } +} +---- + +With this code in place, it's time to rebuild the application and the `aggregator` service in Docker Compose: + +[source, subs=attributes+] +---- +mvn clean package -f aggregator/pom.xml +docker-compose stop aggregator +docker-compose up --build -d +---- + +This will rebuild the `aggregator` container and restart its service. +Once that's done, you can invoke the service's REST API to obtain the temperature data for one of the existing stations. +To do so, you can use `httpie` in the tooling container launched before: + +[source, subs=attributes+] +---- +http aggregator:8080/weather-stations/data/1 + +HTTP/1.1 200 OK +Connection: keep-alive +Content-Length: 85 +Content-Type: application/json +Date: Tue, 18 Jun 2019 19:29:16 GMT + +{ + "avg": 12.9, + "count": 146, + "max": 41.0, + "min": -25.6, + "stationId": 1, + "stationName": "Hamburg" +} +---- + +== Scaling Out + +A very interesting trait of Kafka Streams applications is that they can be scaled out, +i.e. the load and state can be distributed amongst multiple application instances running the same pipeline. +Each node will then contain a subset of the aggregation results, +but Kafka Streams provides you with an API to obtain the information which node is hosting a given key. +The application can then either fetch the data directly from the other instance or simply point the client to the location of that other node. + +The `KafkaStreamsPipeline` class must be adjusted slightly for this distributed architecture: + +[source, java] +---- +public GetWeatherStationDataResult getWeatherStationData(int id) { + StreamsMetadata metadata = streams.metadataForKey( + WEATHER_STATIONS_STORE, + id, + Serdes.Integer().serializer() + ); + + if (metadata == null || metadata == StreamsMetadata.NOT_AVAILABLE) { + LOG.warn("Found no metadata for key {}", id); + return GetWeatherStationDataResult.notFound(); + } + else if (metadata.host().equals(host)) { + LOG.info("Found data for key {} locally", id); + Aggregation result = getWeatherStationStore().get(id); + + if (result != null) { + return GetWeatherStationDataResult.found(WeatherStationData.from(result)); + } + else { + return GetWeatherStationDataResult.notFound(); + } + } + else { + LOG.info("Found data for key {} on remote host {}:{}", id, metadata.host(), metadata.port()); + return GetWeatherStationDataResult.foundRemotely(metadata.host(), metadata.port()); + } +} + +public List getMetaData() { + return streams.allMetadataForStore(WEATHER_STATIONS_STORE) + .stream() + .map(m -> new PipelineMetadata( + m.hostInfo().host() + ":" + m.hostInfo().port(), + m.topicPartitions() + .stream() + .map(TopicPartition::toString) + .collect(Collectors.toSet())) + ) + .collect(Collectors.toList()); +} +---- + +The `getWeatherStationData()` method first checks now which node in the application cluster is the master for the given id. +It can either be stored locally, in which case the data will be obtained from the local state store, +or it can be stored on one of the other nodes, in which case the information about that node (host and port) +will be returned. + +The `getMetaData()` method is added to provide callers with a list of all the nodes in the application cluster. + +The `GetWeatherStationDataResult` type must be adjusted accordingly: + +[source, java] +---- +package org.acme.quarkus.sample.kafkastreams.streams; + +import java.util.Optional; +import java.util.OptionalInt; + +import org.acme.quarkus.sample.kafkastreams.model.WeatherStationData; + +public class GetWeatherStationDataResult { + + private static GetWeatherStationDataResult NOT_FOUND = new GetWeatherStationDataResult(null, null, null); + + private final WeatherStationData result; + private final String host; + private final Integer port; + + private GetWeatherStationDataResult(WeatherStationData result, String host, Integer port) { + this.result = result; + this.host = host; + this.port = port; + } + + public static GetWeatherStationDataResult found(WeatherStationData data) { + return new GetWeatherStationDataResult(data, null, null); + } + + public static GetWeatherStationDataResult foundRemotely(String host, int port) { + return new GetWeatherStationDataResult(null, host, port); + } + + public static GetWeatherStationDataResult notFound() { + return NOT_FOUND; + } + + public Optional getResult() { + return Optional.ofNullable(result); + } + + public Optional getHost() { + return Optional.ofNullable(host); + } + + public OptionalInt getPort() { + return port != null ? OptionalInt.of(port) : OptionalInt.empty(); + } +} +---- + +Also the return type for `getMetaData()` must be defined +(`aggregator/src/main/java/org/acme/quarkus/sample/kafkastreams/streams/PipelineMetadata.java`) + +[source, java] +---- +package org.acme.quarkus.sample.kafkastreams.streams; + +import java.util.Set; + +public class PipelineMetadata { + + public String host; + public Set partitions; + + public PipelineMetadata(String host, Set partitions) { + this.host = host; + this.partitions = partitions; + } +} +---- + +Lastly, the REST endpoint class must be updated: + +[source, java] +---- +package org.acme.quarkus.sample.kafkastreams.rest; + +import java.net.URI; +import java.net.URISyntaxException; +import java.util.List; + +import javax.enterprise.context.ApplicationScoped; +import javax.inject.Inject; +import javax.ws.rs.Consumes; +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.Response.Status; + +import org.acme.quarkus.sample.kafkastreams.streams.GetWeatherStationDataResult; +import org.acme.quarkus.sample.kafkastreams.streams.KafkaStreamsPipeline; +import org.acme.quarkus.sample.kafkastreams.streams.PipelineMetadata; + +@ApplicationScoped +@Path("/weather-stations") +public class WeatherStationEndpoint { + + @Inject + KafkaStreamsPipeline pipeline; + + @GET + @Path("/data/{id}") + @Consumes(MediaType.APPLICATION_JSON) + @Produces(MediaType.APPLICATION_JSON) + public Response getWeatherStationData(@PathParam("id") int id) { + GetWeatherStationDataResult result = pipeline.getWeatherStationData(id); + + if (result.getResult().isPresent()) { + return Response.ok(result.getResult().get()).build(); + } + else if (result.getHost().isPresent()) { + URI otherUri = getOtherUri(result.getHost().get(), result.getPort().getAsInt(), id); + return Response.seeOther(otherUri).build(); + } + else { + return Response.status(Status.NOT_FOUND.getStatusCode(), "No data found for weather station " + id).build(); + } + } + + @GET + @Path("/meta-data") + @Produces(MediaType.APPLICATION_JSON) + public List getMetaData() { + return pipeline.getMetaData(); + } + + private URI getOtherUri(String host, int port, int id) { + try { + return new URI("http://" + host + ":" + port + "/weather-stations/data/" + id); + } + catch (URISyntaxException e) { + throw new RuntimeException(e); + } + } +} +---- + +The `getWeatherStationData()` resource method will now reply with a redirect +(HTTP status code 303) if the data for the given key is stored on one of the other nodes. + +Now stop the `aggregator` service again and rebuild it. +Then let's spin up three instances of it: + +[source, subs=attributes+] +---- +mvn clean package -f aggregator/pom.xml +docker-compose stop aggregator +docker-compose up --build -d --scale aggregator=3 +---- + +When invoking the REST API on any of the three instances, it might either be +that the aggregation for the requested weather station id is stored locally on the node receiving the query, +or it could be stored on one of the other two nodes. + +As the load balancer of Docker Compose will distribute requests to the _aggregator_ service in a round-robin fashion, +we'll invoke the actual nodes directly. +The application exposes information about all the host names via REST: + +[source, subs=attributes+] +---- +http aggregator:8080/weather-stations/meta-data + +HTTP/1.1 200 OK +Connection: keep-alive +Content-Length: 202 +Content-Type: application/json +Date: Tue, 18 Jun 2019 20:00:23 GMT + +[ + { + "host": "2af13fe516a9:8080", + "partitions": [ + "temperature-values-2" + ] + }, + { + "host": "32cc8309611b:8080", + "partitions": [ + "temperature-values-1" + ] + }, + { + "host": "1eb39af8d587:8080", + "partitions": [ + "temperature-values-0" + ] + } +] +---- + +Retrieve the data from one of the three hosts shown in the response +(your actual host names will differ): + +[source, subs=attributes+] +---- +http 2af13fe516a9:8080/weather-stations/data/1 +---- + +If that node holds the data for key "1", you'll get a response like this: + +``` +HTTP/1.1 200 OK +Connection: keep-alive +Content-Length: 74 +Content-Type: application/json +Date: Tue, 11 Jun 2019 19:16:31 GMT + +{ + "avg": 11.9, + "count": 259, + "max": 50.0, + "min": -30.1, + "stationId": 1, + "stationName": "Hamburg" +} +``` + +Otherwise, the service will send a redirect: + +``` +HTTP/1.1 303 See Other +Connection: keep-alive +Content-Length: 0 +Date: Tue, 18 Jun 2019 20:01:03 GMT +Location: http://1eb39af8d587:8080/weather-stations/data/1 +``` + +You can have _httpie_ automatically follow the redirect by passing the `--follow option`: + +```bash +http --follow 2af13fe516a9:8080/weather-stations/data/1 +``` + +== Running Native + +You can build the native executable with: + +[source, shell] +---- +mvn package -Pnative +---- + +== Going Further + +This guide has shown how you can interact with Kafka using Quarkus. +It utilizes MicroProfile Reactive Messaging to build data streaming applications. + +If you want to go further check the documentation of https://smallrye.io/smallrye-reactive-messaging[SmallRye Reactive Messaging], the implementation used in Quarkus.