diff --git a/bom/pom.xml b/bom/pom.xml
index f5f2b0574b4b38..2634e9639dd52a 100644
--- a/bom/pom.xml
+++ b/bom/pom.xml
@@ -365,6 +365,11 @@
quarkus-kafka-client
${project.version}
+
+ io.quarkus
+ quarkus-kafka-streams
+ ${project.version}
+
io.quarkus
quarkus-smallrye-health
@@ -1093,6 +1098,11 @@
kafka-clients
${kafka-clients.version}
+
+ org.apache.kafka
+ kafka-streams
+ ${kafka-clients.version}
+
org.apache.kafka
kafka_2.12
diff --git a/core/deployment/src/main/java/io/quarkus/deployment/builditem/FeatureBuildItem.java b/core/deployment/src/main/java/io/quarkus/deployment/builditem/FeatureBuildItem.java
index 43e9297bbc72fc..597cbfefbb2297 100644
--- a/core/deployment/src/main/java/io/quarkus/deployment/builditem/FeatureBuildItem.java
+++ b/core/deployment/src/main/java/io/quarkus/deployment/builditem/FeatureBuildItem.java
@@ -32,6 +32,7 @@ public final class FeatureBuildItem extends MultiBuildItem {
public static final String JDBC_MARIADB = "jdbc-mariadb";
public static final String JDBC_POSTGRESQL = "jdbc-postgresql";
public static final String JDBC_MSSQL = "jdbc-mssql";
+ public static final String KAFKA_STREAMS = "kafka-streams";
public static final String KEYCLOAK = "keycloak";
public static final String KOTLIN = "kotlin";
public static final String MAILER = "mailer";
diff --git a/extensions/kafka-streams/deployment/pom.xml b/extensions/kafka-streams/deployment/pom.xml
new file mode 100644
index 00000000000000..e13d762d978eb7
--- /dev/null
+++ b/extensions/kafka-streams/deployment/pom.xml
@@ -0,0 +1,72 @@
+
+
+
+
+
+ quarkus-kafka-streams-parent
+ io.quarkus
+ 999-SNAPSHOT
+ ../
+
+ 4.0.0
+
+ quarkus-kafka-streams-deployment
+ Quarkus - Kafka Streams - Deployment
+
+
+
+ io.quarkus
+ quarkus-core-deployment
+
+
+ io.quarkus
+ quarkus-kafka-streams
+
+
+
+
+ io.quarkus
+ quarkus-junit5-internal
+ test
+
+
+ io.rest-assured
+ rest-assured
+ test
+
+
+
+
+
+
+ maven-compiler-plugin
+
+
+
+ io.quarkus
+ quarkus-extension-processor
+ ${project.version}
+
+
+
+
+
+
+
+
diff --git a/extensions/kafka-streams/deployment/src/main/java/io/quarkus/kafka/streams/deployment/KafkaStreamsProcessor.java b/extensions/kafka-streams/deployment/src/main/java/io/quarkus/kafka/streams/deployment/KafkaStreamsProcessor.java
new file mode 100644
index 00000000000000..0bde6d29b16307
--- /dev/null
+++ b/extensions/kafka-streams/deployment/src/main/java/io/quarkus/kafka/streams/deployment/KafkaStreamsProcessor.java
@@ -0,0 +1,72 @@
+/*
+ * Copyright 2019 Red Hat, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.quarkus.kafka.streams.deployment;
+
+import java.io.IOException;
+
+import org.apache.kafka.common.serialization.Serdes.ByteArraySerde;
+import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler;
+import org.apache.kafka.streams.errors.LogAndFailExceptionHandler;
+import org.apache.kafka.streams.processor.DefaultPartitionGrouper;
+import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
+import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor;
+import org.rocksdb.util.Environment;
+
+import io.quarkus.deployment.annotations.BuildProducer;
+import io.quarkus.deployment.annotations.BuildStep;
+import io.quarkus.deployment.annotations.ExecutionTime;
+import io.quarkus.deployment.annotations.Record;
+import io.quarkus.deployment.builditem.FeatureBuildItem;
+import io.quarkus.deployment.builditem.substrate.ReflectiveClassBuildItem;
+import io.quarkus.deployment.builditem.substrate.RuntimeReinitializedClassBuildItem;
+import io.quarkus.deployment.builditem.substrate.SubstrateResourceBuildItem;
+import io.quarkus.deployment.recording.RecorderContext;
+import io.quarkus.kafka.streams.runtime.KafkaStreamsTemplate;
+
+class KafkaStreamsProcessor {
+
+ @BuildStep
+ @Record(ExecutionTime.STATIC_INIT)
+ void build(RecorderContext recorder,
+ BuildProducer feature,
+ BuildProducer reflectiveClasses,
+ BuildProducer reinitialized,
+ BuildProducer nativeLibs) throws IOException {
+
+ feature.produce(new FeatureBuildItem(FeatureBuildItem.KAFKA_STREAMS));
+
+ reflectiveClasses.produce(new ReflectiveClassBuildItem(true, false, false, StreamsPartitionAssignor.class));
+ reflectiveClasses.produce(new ReflectiveClassBuildItem(true, false, false, DefaultPartitionGrouper.class));
+ reflectiveClasses.produce(new ReflectiveClassBuildItem(true, false, false, DefaultProductionExceptionHandler.class));
+ reflectiveClasses.produce(new ReflectiveClassBuildItem(true, false, false, LogAndFailExceptionHandler.class));
+ reflectiveClasses.produce(new ReflectiveClassBuildItem(true, false, false, ByteArraySerde.class));
+ reflectiveClasses.produce(new ReflectiveClassBuildItem(true, false, false, FailOnInvalidTimestamp.class));
+
+ nativeLibs.produce(new SubstrateResourceBuildItem(Environment.getJniLibraryFileName("rocksdb")));
+
+ // re-initializing RocksDB to enable load of native libs
+ reinitialized.produce(new RuntimeReinitializedClassBuildItem("org.rocksdb.RocksDB"));
+ }
+
+ @BuildStep
+ @Record(ExecutionTime.RUNTIME_INIT)
+ void build(KafkaStreamsTemplate template) {
+ // Explicitly loading RocksDB native libs, as that's normally done from within
+ // static initializers which already ran during build
+ template.loadRocksDb();
+ }
+}
diff --git a/extensions/kafka-streams/pom.xml b/extensions/kafka-streams/pom.xml
new file mode 100644
index 00000000000000..cb6bbfa5ab6692
--- /dev/null
+++ b/extensions/kafka-streams/pom.xml
@@ -0,0 +1,37 @@
+
+
+
+
+
+ quarkus-build-parent
+ io.quarkus
+ 999-SNAPSHOT
+ ../../build-parent/pom.xml
+
+ 4.0.0
+
+ quarkus-kafka-streams-parent
+ Quarkus - Kafka Streams
+
+ pom
+
+ deployment
+ runtime
+
+
diff --git a/extensions/kafka-streams/runtime/pom.xml b/extensions/kafka-streams/runtime/pom.xml
new file mode 100644
index 00000000000000..5a576783aec98c
--- /dev/null
+++ b/extensions/kafka-streams/runtime/pom.xml
@@ -0,0 +1,71 @@
+
+
+
+
+
+ quarkus-kafka-streams-parent
+ io.quarkus
+ 999-SNAPSHOT
+ ../
+
+ 4.0.0
+
+ quarkus-kafka-streams
+ Quarkus - Kafka Streams - Runtime
+
+
+
+ io.quarkus
+ quarkus-core
+
+
+ io.quarkus
+ quarkus-kafka-client
+
+
+ org.apache.kafka
+ kafka-streams
+
+
+ com.oracle.substratevm
+ svm
+
+
+
+
+
+
+ io.quarkus
+ quarkus-bootstrap-maven-plugin
+
+
+ maven-compiler-plugin
+
+
+
+ io.quarkus
+ quarkus-extension-processor
+ ${project.version}
+
+
+
+
+
+
+
diff --git a/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsTemplate.java b/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsTemplate.java
new file mode 100644
index 00000000000000..06332490cb4f49
--- /dev/null
+++ b/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsTemplate.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2019 Red Hat, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.quarkus.kafka.streams.runtime;
+
+import org.rocksdb.RocksDB;
+
+import io.quarkus.runtime.annotations.Template;
+
+@Template
+public class KafkaStreamsTemplate {
+
+ public void loadRocksDb() {
+ RocksDB.loadLibrary();
+ }
+}
diff --git a/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/graal/KafkaStreamsSubstitutions.java b/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/graal/KafkaStreamsSubstitutions.java
new file mode 100644
index 00000000000000..007357b8f6cab7
--- /dev/null
+++ b/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/graal/KafkaStreamsSubstitutions.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2019 Red Hat, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.quarkus.kafka.streams.runtime.graal;
+
+import com.oracle.svm.core.annotate.Alias;
+import com.oracle.svm.core.annotate.RecomputeFieldValue;
+import com.oracle.svm.core.annotate.TargetClass;
+
+/**
+ * Resets the {@code initialized} field, so that the native libs are loaded again at
+ * image runtime, after they have been loaded once at build time via calls from static
+ * initializers.
+ */
+@TargetClass(className = "org.rocksdb.NativeLibraryLoader")
+final class Target_org_rocksdb_NativeLibraryLoader {
+
+ @Alias
+ @RecomputeFieldValue(kind = RecomputeFieldValue.Kind.Reset)
+ private static boolean initialized = false;
+}
+
+public final class KafkaStreamsSubstitutions {
+}
diff --git a/extensions/pom.xml b/extensions/pom.xml
index e756a308130a40..d53cd8617c229a 100644
--- a/extensions/pom.xml
+++ b/extensions/pom.xml
@@ -81,6 +81,7 @@
hibernate-search-elasticsearch
elasticsearch-rest-client
kafka-client
+ kafka-streams
spring-di