diff --git a/bom/pom.xml b/bom/pom.xml
index 08bbe283bab76..6fdeacbf1b2a8 100644
--- a/bom/pom.xml
+++ b/bom/pom.xml
@@ -347,6 +347,11 @@
quarkus-kafka-client
${project.version}
+
+ io.quarkus
+ quarkus-kafka-streams
+ ${project.version}
+
io.quarkus
quarkus-smallrye-health
@@ -1048,6 +1053,11 @@
kafka-clients
${kafka-clients.version}
+
+ org.apache.kafka
+ kafka-streams
+ ${kafka-clients.version}
+
org.apache.kafka
kafka_2.12
diff --git a/core/deployment/src/main/java/io/quarkus/deployment/builditem/FeatureBuildItem.java b/core/deployment/src/main/java/io/quarkus/deployment/builditem/FeatureBuildItem.java
index 0c14c419a3854..16a5306118ac4 100644
--- a/core/deployment/src/main/java/io/quarkus/deployment/builditem/FeatureBuildItem.java
+++ b/core/deployment/src/main/java/io/quarkus/deployment/builditem/FeatureBuildItem.java
@@ -29,6 +29,7 @@ public final class FeatureBuildItem extends MultiBuildItem {
public static final String JDBC_MARIADB = "jdbc-mariadb";
public static final String JDBC_POSTGRESQL = "jdbc-postgresql";
public static final String JDBC_MSSQL = "jdbc-mssql";
+ public static final String KAFKA_STREAMS = "kafka-streams";
public static final String KEYCLOAK = "keycloak";
public static final String KOTLIN = "kotlin";
public static final String MAILER = "mailer";
diff --git a/extensions/kafka-streams/deployment/pom.xml b/extensions/kafka-streams/deployment/pom.xml
new file mode 100644
index 0000000000000..1741cf4fef73f
--- /dev/null
+++ b/extensions/kafka-streams/deployment/pom.xml
@@ -0,0 +1,84 @@
+
+
+
+
+
+ quarkus-kafka-streams-parent
+ io.quarkus
+ 999-SNAPSHOT
+ ../
+
+ 4.0.0
+
+ quarkus-kafka-streams-deployment
+ Quarkus - Kafka Streams - Deployment
+
+
+
+ io.quarkus
+ quarkus-core-deployment
+
+
+ io.quarkus
+ quarkus-arc-deployment
+
+
+ io.quarkus
+ quarkus-undertow-deployment
+
+
+ io.quarkus
+ quarkus-jsonp-deployment
+
+
+ io.quarkus
+ quarkus-kafka-streams
+
+
+
+
+ io.quarkus
+ quarkus-junit5-internal
+ test
+
+
+ io.rest-assured
+ rest-assured
+ test
+
+
+
+
+
+
+ maven-compiler-plugin
+
+
+
+ io.quarkus
+ quarkus-extension-processor
+ ${project.version}
+
+
+
+
+
+
+
+
diff --git a/extensions/kafka-streams/deployment/src/main/java/io/quarkus/kafka/streams/deployment/KafkaStreamsProcessor.java b/extensions/kafka-streams/deployment/src/main/java/io/quarkus/kafka/streams/deployment/KafkaStreamsProcessor.java
new file mode 100644
index 0000000000000..77b95dae935c6
--- /dev/null
+++ b/extensions/kafka-streams/deployment/src/main/java/io/quarkus/kafka/streams/deployment/KafkaStreamsProcessor.java
@@ -0,0 +1,68 @@
+/*
+ * Copyright 2019 Red Hat, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.quarkus.kafka.streams.deployment;
+
+import java.io.IOException;
+
+import org.apache.kafka.common.serialization.Serdes.ByteArraySerde;
+import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler;
+import org.apache.kafka.streams.errors.LogAndFailExceptionHandler;
+import org.apache.kafka.streams.processor.DefaultPartitionGrouper;
+import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
+import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor;
+
+import io.quarkus.deployment.annotations.BuildProducer;
+import io.quarkus.deployment.annotations.BuildStep;
+import io.quarkus.deployment.annotations.ExecutionTime;
+import io.quarkus.deployment.annotations.Record;
+import io.quarkus.deployment.builditem.FeatureBuildItem;
+import io.quarkus.deployment.builditem.substrate.ReflectiveClassBuildItem;
+import io.quarkus.deployment.builditem.substrate.SubstrateResourceBuildItem;
+import io.quarkus.deployment.recording.RecorderContext;
+import io.quarkus.kafka.streams.runtime.KafkaStreamsTemplate;
+
+class KafkaStreamsProcessor {
+
+ @BuildStep
+ @Record(ExecutionTime.STATIC_INIT)
+ void build(RecorderContext recorder,
+ BuildProducer feature,
+ BuildProducer reflectiveClasses,
+ BuildProducer nativeLibs) throws IOException {
+
+ feature.produce(new FeatureBuildItem(FeatureBuildItem.KAFKA_STREAMS));
+
+ reflectiveClasses.produce(new ReflectiveClassBuildItem(true, false, false, StreamsPartitionAssignor.class));
+ reflectiveClasses.produce(new ReflectiveClassBuildItem(true, false, false, DefaultPartitionGrouper.class));
+ reflectiveClasses.produce(new ReflectiveClassBuildItem(true, false, false, DefaultProductionExceptionHandler.class));
+ reflectiveClasses.produce(new ReflectiveClassBuildItem(true, false, false, LogAndFailExceptionHandler.class));
+ reflectiveClasses.produce(new ReflectiveClassBuildItem(true, false, false, ByteArraySerde.class));
+ reflectiveClasses.produce(new ReflectiveClassBuildItem(true, false, false, FailOnInvalidTimestamp.class));
+
+ // TODO Can it be chosen based on platform we're running on?
+ nativeLibs.produce(new SubstrateResourceBuildItem("librocksdbjni-linux64.so"));
+ nativeLibs.produce(new SubstrateResourceBuildItem("librocksdbjni-osx.jnilib"));
+ }
+
+ @BuildStep
+ @Record(ExecutionTime.RUNTIME_INIT)
+ void build(KafkaStreamsTemplate template) {
+ // Explicitly loading RocksDB native libs, as that's normally done from within
+ // static initializers which already ran during build
+ template.loadRocksDb();
+ }
+}
diff --git a/extensions/kafka-streams/pom.xml b/extensions/kafka-streams/pom.xml
new file mode 100644
index 0000000000000..cb6bbfa5ab669
--- /dev/null
+++ b/extensions/kafka-streams/pom.xml
@@ -0,0 +1,37 @@
+
+
+
+
+
+ quarkus-build-parent
+ io.quarkus
+ 999-SNAPSHOT
+ ../../build-parent/pom.xml
+
+ 4.0.0
+
+ quarkus-kafka-streams-parent
+ Quarkus - Kafka Streams
+
+ pom
+
+ deployment
+ runtime
+
+
diff --git a/extensions/kafka-streams/runtime/pom.xml b/extensions/kafka-streams/runtime/pom.xml
new file mode 100644
index 0000000000000..eb1a37fd1e864
--- /dev/null
+++ b/extensions/kafka-streams/runtime/pom.xml
@@ -0,0 +1,67 @@
+
+
+
+
+
+ quarkus-kafka-streams-parent
+ io.quarkus
+ 999-SNAPSHOT
+ ../
+
+ 4.0.0
+
+ quarkus-kafka-streams
+ Quarkus - Kafka Streams - Runtime
+
+
+
+ io.quarkus
+ quarkus-core
+
+
+ org.apache.kafka
+ kafka-streams
+
+
+ com.oracle.substratevm
+ svm
+
+
+
+
+
+
+ io.quarkus
+ quarkus-bootstrap-maven-plugin
+
+
+ maven-compiler-plugin
+
+
+
+ io.quarkus
+ quarkus-extension-processor
+ ${project.version}
+
+
+
+
+
+
+
diff --git a/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsTemplate.java b/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsTemplate.java
new file mode 100644
index 0000000000000..06332490cb4f4
--- /dev/null
+++ b/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsTemplate.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2019 Red Hat, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.quarkus.kafka.streams.runtime;
+
+import org.rocksdb.RocksDB;
+
+import io.quarkus.runtime.annotations.Template;
+
+@Template
+public class KafkaStreamsTemplate {
+
+ public void loadRocksDb() {
+ RocksDB.loadLibrary();
+ }
+}
diff --git a/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/graal/KafkaStreamsJniRegistration.java b/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/graal/KafkaStreamsJniRegistration.java
new file mode 100644
index 0000000000000..d58cd28247e5c
--- /dev/null
+++ b/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/graal/KafkaStreamsJniRegistration.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2019 Red Hat, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.quarkus.kafka.streams.runtime.graal;
+
+import org.graalvm.nativeimage.Feature;
+import org.rocksdb.BlockBasedTableConfig;
+import org.rocksdb.FlushOptions;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.WriteBatch;
+
+import com.oracle.svm.core.annotate.AutomaticFeature;
+import com.oracle.svm.hosted.jni.JNIRuntimeAccess;
+
+@AutomaticFeature
+class KafkaStreamsJniRegistration implements Feature {
+
+ @Override
+ public void beforeAnalysis(BeforeAnalysisAccess access) {
+ JNIRuntimeAccess.register(
+ BlockBasedTableConfig.class,
+ FlushOptions.class,
+ Options.class,
+ RocksDB.class,
+ WriteBatch.class);
+
+ try {
+ JNIRuntimeAccess.register(Options.class.getDeclaredMethod("newOptions"));
+ } catch (NoSuchMethodException | SecurityException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/graal/KafkaStreamsSubstitutions.java b/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/graal/KafkaStreamsSubstitutions.java
new file mode 100644
index 0000000000000..4de606b5f3c7d
--- /dev/null
+++ b/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/graal/KafkaStreamsSubstitutions.java
@@ -0,0 +1,110 @@
+/*
+ * Copyright 2019 Red Hat, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.quarkus.kafka.streams.runtime.graal;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.rocksdb.CompressionType;
+
+import com.oracle.svm.core.annotate.Alias;
+import com.oracle.svm.core.annotate.RecomputeFieldValue;
+import com.oracle.svm.core.annotate.RecomputeFieldValue.Kind;
+import com.oracle.svm.core.annotate.Substitute;
+import com.oracle.svm.core.annotate.TargetClass;
+
+/**
+ * loadLibrary() loads the RocksDB native libraries and is called from multiple static initializers.
+ * It's designed to run only once (guarded by the {@code libraryLoaded} field). As I don't know how
+ * to reset this field to its initial value (cannot access the original {@code LibraryState} type
+ * from within the substitution class, the entire method is substituted.
+ */
+@TargetClass(className = "org.rocksdb.RocksDB")
+final class Target_org_rocksdb_RocksDB {
+
+ private enum LibraryState {
+ NOT_LOADED,
+ LOADING,
+ LOADED
+ }
+
+ @Alias
+ @RecomputeFieldValue(kind = Kind.FromAlias)
+ private static AtomicReference libraryLoaded = new AtomicReference<>(LibraryState.NOT_LOADED);
+
+ @Substitute
+ public static void loadLibrary() {
+ System.out.println("KSS 1");
+ if (libraryLoaded.get() == LibraryState.LOADED) {
+ return;
+ }
+
+ System.out.println("KSS 2");
+
+ if (libraryLoaded.compareAndSet(LibraryState.NOT_LOADED,
+ LibraryState.LOADING)) {
+ final String tmpDir = System.getenv("ROCKSDB_SHAREDLIB_DIR");
+ // loading possibly necessary libraries.
+ for (final CompressionType compressionType : CompressionType.values()) {
+ try {
+ if (compressionType.getLibraryName() != null) {
+ System.loadLibrary(compressionType.getLibraryName());
+ }
+ } catch (UnsatisfiedLinkError e) {
+ // since it may be optional, we ignore its loading failure here.
+ }
+ }
+
+ System.out.println("KSS 3");
+ try {
+ org.rocksdb.NativeLibraryLoader.getInstance().loadLibrary(tmpDir);
+ System.out.println("KSS 4");
+ } catch (IOException e) {
+ System.out.println("KSS 5");
+ libraryLoaded.set(LibraryState.NOT_LOADED);
+ throw new RuntimeException("Unable to load the RocksDB shared library", e);
+ }
+
+ System.out.println("KSS 6");
+ libraryLoaded.set(LibraryState.LOADED);
+ return;
+ }
+
+ while (libraryLoaded.get() == LibraryState.LOADING) {
+ try {
+ Thread.sleep(10);
+ } catch (final InterruptedException e) {
+ //ignore
+ }
+ }
+ }
+}
+
+/**
+ * Resets the {@code initialized} field, so that the native libs are loaded again at
+ * image runtime, after they have been loaded once at build time via calls from static
+ * initializers.
+ */
+@TargetClass(className = "org.rocksdb.NativeLibraryLoader")
+final class Target_org_rocksdb_NativeLibraryLoader {
+
+ @Alias
+ @RecomputeFieldValue(kind = RecomputeFieldValue.Kind.Reset)
+ private static boolean initialized = false;
+}
+
+public final class KafkaStreamsSubstitutions {
+}
diff --git a/extensions/pom.xml b/extensions/pom.xml
index 28de77bceba89..8b42fa37d2ec7 100644
--- a/extensions/pom.xml
+++ b/extensions/pom.xml
@@ -81,6 +81,7 @@
hibernate-search-elasticsearch
elasticsearch-rest-client
kafka-client
+ kafka-streams
spring-di