diff --git a/bom/pom.xml b/bom/pom.xml
index 38f3cb04d3268..6cada6aa54fc1 100644
--- a/bom/pom.xml
+++ b/bom/pom.xml
@@ -363,6 +363,11 @@
quarkus-kafka-client
${project.version}
+
+ io.quarkus
+ quarkus-kafka-streams
+ ${project.version}
+
io.quarkus
quarkus-smallrye-health
@@ -1085,6 +1090,11 @@
kafka-clients
${kafka-clients.version}
+
+ org.apache.kafka
+ kafka-streams
+ ${kafka-clients.version}
+
org.apache.kafka
kafka_2.12
diff --git a/core/deployment/src/main/java/io/quarkus/deployment/builditem/FeatureBuildItem.java b/core/deployment/src/main/java/io/quarkus/deployment/builditem/FeatureBuildItem.java
index 43e9297bbc72f..597cbfefbb229 100644
--- a/core/deployment/src/main/java/io/quarkus/deployment/builditem/FeatureBuildItem.java
+++ b/core/deployment/src/main/java/io/quarkus/deployment/builditem/FeatureBuildItem.java
@@ -32,6 +32,7 @@ public final class FeatureBuildItem extends MultiBuildItem {
public static final String JDBC_MARIADB = "jdbc-mariadb";
public static final String JDBC_POSTGRESQL = "jdbc-postgresql";
public static final String JDBC_MSSQL = "jdbc-mssql";
+ public static final String KAFKA_STREAMS = "kafka-streams";
public static final String KEYCLOAK = "keycloak";
public static final String KOTLIN = "kotlin";
public static final String MAILER = "mailer";
diff --git a/devtools/common/src/main/filtered/extensions.json b/devtools/common/src/main/filtered/extensions.json
index c39f310335eed..6070f30f0418d 100644
--- a/devtools/common/src/main/filtered/extensions.json
+++ b/devtools/common/src/main/filtered/extensions.json
@@ -497,5 +497,22 @@
],
"groupId": "io.quarkus",
"artifactId": "quarkus-reactive-pg-client"
+ },
+ {
+ "name": "Apache Kafka Client",
+ "labels": [
+ "kafka"
+ ],
+ "groupId": "io.quarkus",
+ "artifactId": "quarkus-kafka-client"
+ },
+ {
+ "name": "Apache Kafka Streams",
+ "labels": [
+ "kafka",
+ "kafka-streams"
+ ],
+ "groupId": "io.quarkus",
+ "artifactId": "quarkus-kafka-streams"
}
]
diff --git a/extensions/kafka-streams/deployment/pom.xml b/extensions/kafka-streams/deployment/pom.xml
new file mode 100644
index 0000000000000..e13d762d978eb
--- /dev/null
+++ b/extensions/kafka-streams/deployment/pom.xml
@@ -0,0 +1,72 @@
+
+
+
+
+
+ quarkus-kafka-streams-parent
+ io.quarkus
+ 999-SNAPSHOT
+ ../
+
+ 4.0.0
+
+ quarkus-kafka-streams-deployment
+ Quarkus - Kafka Streams - Deployment
+
+
+
+ io.quarkus
+ quarkus-core-deployment
+
+
+ io.quarkus
+ quarkus-kafka-streams
+
+
+
+
+ io.quarkus
+ quarkus-junit5-internal
+ test
+
+
+ io.rest-assured
+ rest-assured
+ test
+
+
+
+
+
+
+ maven-compiler-plugin
+
+
+
+ io.quarkus
+ quarkus-extension-processor
+ ${project.version}
+
+
+
+
+
+
+
+
diff --git a/extensions/kafka-streams/deployment/src/main/java/io/quarkus/kafka/streams/deployment/KafkaStreamsProcessor.java b/extensions/kafka-streams/deployment/src/main/java/io/quarkus/kafka/streams/deployment/KafkaStreamsProcessor.java
new file mode 100644
index 0000000000000..0bde6d29b1630
--- /dev/null
+++ b/extensions/kafka-streams/deployment/src/main/java/io/quarkus/kafka/streams/deployment/KafkaStreamsProcessor.java
@@ -0,0 +1,72 @@
+/*
+ * Copyright 2019 Red Hat, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.quarkus.kafka.streams.deployment;
+
+import java.io.IOException;
+
+import org.apache.kafka.common.serialization.Serdes.ByteArraySerde;
+import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler;
+import org.apache.kafka.streams.errors.LogAndFailExceptionHandler;
+import org.apache.kafka.streams.processor.DefaultPartitionGrouper;
+import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
+import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor;
+import org.rocksdb.util.Environment;
+
+import io.quarkus.deployment.annotations.BuildProducer;
+import io.quarkus.deployment.annotations.BuildStep;
+import io.quarkus.deployment.annotations.ExecutionTime;
+import io.quarkus.deployment.annotations.Record;
+import io.quarkus.deployment.builditem.FeatureBuildItem;
+import io.quarkus.deployment.builditem.substrate.ReflectiveClassBuildItem;
+import io.quarkus.deployment.builditem.substrate.RuntimeReinitializedClassBuildItem;
+import io.quarkus.deployment.builditem.substrate.SubstrateResourceBuildItem;
+import io.quarkus.deployment.recording.RecorderContext;
+import io.quarkus.kafka.streams.runtime.KafkaStreamsTemplate;
+
+class KafkaStreamsProcessor {
+
+ @BuildStep
+ @Record(ExecutionTime.STATIC_INIT)
+ void build(RecorderContext recorder,
+ BuildProducer feature,
+ BuildProducer reflectiveClasses,
+ BuildProducer reinitialized,
+ BuildProducer nativeLibs) throws IOException {
+
+ feature.produce(new FeatureBuildItem(FeatureBuildItem.KAFKA_STREAMS));
+
+ reflectiveClasses.produce(new ReflectiveClassBuildItem(true, false, false, StreamsPartitionAssignor.class));
+ reflectiveClasses.produce(new ReflectiveClassBuildItem(true, false, false, DefaultPartitionGrouper.class));
+ reflectiveClasses.produce(new ReflectiveClassBuildItem(true, false, false, DefaultProductionExceptionHandler.class));
+ reflectiveClasses.produce(new ReflectiveClassBuildItem(true, false, false, LogAndFailExceptionHandler.class));
+ reflectiveClasses.produce(new ReflectiveClassBuildItem(true, false, false, ByteArraySerde.class));
+ reflectiveClasses.produce(new ReflectiveClassBuildItem(true, false, false, FailOnInvalidTimestamp.class));
+
+ nativeLibs.produce(new SubstrateResourceBuildItem(Environment.getJniLibraryFileName("rocksdb")));
+
+ // re-initializing RocksDB to enable load of native libs
+ reinitialized.produce(new RuntimeReinitializedClassBuildItem("org.rocksdb.RocksDB"));
+ }
+
+ @BuildStep
+ @Record(ExecutionTime.RUNTIME_INIT)
+ void build(KafkaStreamsTemplate template) {
+ // Explicitly loading RocksDB native libs, as that's normally done from within
+ // static initializers which already ran during build
+ template.loadRocksDb();
+ }
+}
diff --git a/extensions/kafka-streams/pom.xml b/extensions/kafka-streams/pom.xml
new file mode 100644
index 0000000000000..cb6bbfa5ab669
--- /dev/null
+++ b/extensions/kafka-streams/pom.xml
@@ -0,0 +1,37 @@
+
+
+
+
+
+ quarkus-build-parent
+ io.quarkus
+ 999-SNAPSHOT
+ ../../build-parent/pom.xml
+
+ 4.0.0
+
+ quarkus-kafka-streams-parent
+ Quarkus - Kafka Streams
+
+ pom
+
+ deployment
+ runtime
+
+
diff --git a/extensions/kafka-streams/runtime/pom.xml b/extensions/kafka-streams/runtime/pom.xml
new file mode 100644
index 0000000000000..5a576783aec98
--- /dev/null
+++ b/extensions/kafka-streams/runtime/pom.xml
@@ -0,0 +1,71 @@
+
+
+
+
+
+ quarkus-kafka-streams-parent
+ io.quarkus
+ 999-SNAPSHOT
+ ../
+
+ 4.0.0
+
+ quarkus-kafka-streams
+ Quarkus - Kafka Streams - Runtime
+
+
+
+ io.quarkus
+ quarkus-core
+
+
+ io.quarkus
+ quarkus-kafka-client
+
+
+ org.apache.kafka
+ kafka-streams
+
+
+ com.oracle.substratevm
+ svm
+
+
+
+
+
+
+ io.quarkus
+ quarkus-bootstrap-maven-plugin
+
+
+ maven-compiler-plugin
+
+
+
+ io.quarkus
+ quarkus-extension-processor
+ ${project.version}
+
+
+
+
+
+
+
diff --git a/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsTemplate.java b/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsTemplate.java
new file mode 100644
index 0000000000000..06332490cb4f4
--- /dev/null
+++ b/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsTemplate.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2019 Red Hat, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.quarkus.kafka.streams.runtime;
+
+import org.rocksdb.RocksDB;
+
+import io.quarkus.runtime.annotations.Template;
+
+@Template
+public class KafkaStreamsTemplate {
+
+ public void loadRocksDb() {
+ RocksDB.loadLibrary();
+ }
+}
diff --git a/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/graal/KafkaStreamsSubstitutions.java b/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/graal/KafkaStreamsSubstitutions.java
new file mode 100644
index 0000000000000..007357b8f6cab
--- /dev/null
+++ b/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/graal/KafkaStreamsSubstitutions.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2019 Red Hat, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.quarkus.kafka.streams.runtime.graal;
+
+import com.oracle.svm.core.annotate.Alias;
+import com.oracle.svm.core.annotate.RecomputeFieldValue;
+import com.oracle.svm.core.annotate.TargetClass;
+
+/**
+ * Resets the {@code initialized} field, so that the native libs are loaded again at
+ * image runtime, after they have been loaded once at build time via calls from static
+ * initializers.
+ */
+@TargetClass(className = "org.rocksdb.NativeLibraryLoader")
+final class Target_org_rocksdb_NativeLibraryLoader {
+
+ @Alias
+ @RecomputeFieldValue(kind = RecomputeFieldValue.Kind.Reset)
+ private static boolean initialized = false;
+}
+
+public final class KafkaStreamsSubstitutions {
+}
diff --git a/extensions/pom.xml b/extensions/pom.xml
index 28de77bceba89..8b42fa37d2ec7 100644
--- a/extensions/pom.xml
+++ b/extensions/pom.xml
@@ -81,6 +81,7 @@
hibernate-search-elasticsearch
elasticsearch-rest-client
kafka-client
+ kafka-streams
spring-di
diff --git a/integration-tests/kafka/pom.xml b/integration-tests/kafka/pom.xml
index 319bc05d480f2..6eb67162f84d5 100644
--- a/integration-tests/kafka/pom.xml
+++ b/integration-tests/kafka/pom.xml
@@ -43,18 +43,30 @@
io.quarkus
quarkus-integration-test-shared-library
+
+ org.awaitility
+ awaitility
+
io.quarkus
quarkus-resteasy
+
+ io.quarkus
+ quarkus-resteasy-jsonb
+
io.quarkus
quarkus-kafka-client
+
+ io.quarkus
+ quarkus-kafka-streams
+
@@ -140,6 +152,7 @@
true
true
+ true