diff --git a/bom/pom.xml b/bom/pom.xml index 08bbe283bab76c..6fdeacbf1b2a8b 100644 --- a/bom/pom.xml +++ b/bom/pom.xml @@ -347,6 +347,11 @@ quarkus-kafka-client ${project.version} + + io.quarkus + quarkus-kafka-streams + ${project.version} + io.quarkus quarkus-smallrye-health @@ -1048,6 +1053,11 @@ kafka-clients ${kafka-clients.version} + + org.apache.kafka + kafka-streams + ${kafka-clients.version} + org.apache.kafka kafka_2.12 diff --git a/core/deployment/src/main/java/io/quarkus/deployment/builditem/FeatureBuildItem.java b/core/deployment/src/main/java/io/quarkus/deployment/builditem/FeatureBuildItem.java index 0c14c419a38546..16a5306118ac4b 100644 --- a/core/deployment/src/main/java/io/quarkus/deployment/builditem/FeatureBuildItem.java +++ b/core/deployment/src/main/java/io/quarkus/deployment/builditem/FeatureBuildItem.java @@ -29,6 +29,7 @@ public final class FeatureBuildItem extends MultiBuildItem { public static final String JDBC_MARIADB = "jdbc-mariadb"; public static final String JDBC_POSTGRESQL = "jdbc-postgresql"; public static final String JDBC_MSSQL = "jdbc-mssql"; + public static final String KAFKA_STREAMS = "kafka-streams"; public static final String KEYCLOAK = "keycloak"; public static final String KOTLIN = "kotlin"; public static final String MAILER = "mailer"; diff --git a/extensions/kafka-streams/deployment/pom.xml b/extensions/kafka-streams/deployment/pom.xml new file mode 100644 index 00000000000000..1741cf4fef73ff --- /dev/null +++ b/extensions/kafka-streams/deployment/pom.xml @@ -0,0 +1,84 @@ + + + + + + quarkus-kafka-streams-parent + io.quarkus + 999-SNAPSHOT + ../ + + 4.0.0 + + quarkus-kafka-streams-deployment + Quarkus - Kafka Streams - Deployment + + + + io.quarkus + quarkus-core-deployment + + + io.quarkus + quarkus-arc-deployment + + + io.quarkus + quarkus-undertow-deployment + + + io.quarkus + quarkus-jsonp-deployment + + + io.quarkus + quarkus-kafka-streams + + + + + io.quarkus + quarkus-junit5-internal + test + + + io.rest-assured + rest-assured + test + + + + + + + maven-compiler-plugin + + + + io.quarkus + quarkus-extension-processor + ${project.version} + + + + + + + + diff --git a/extensions/kafka-streams/deployment/src/main/java/io/quarkus/kafka/streams/deployment/KafkaStreamsProcessor.java b/extensions/kafka-streams/deployment/src/main/java/io/quarkus/kafka/streams/deployment/KafkaStreamsProcessor.java new file mode 100644 index 00000000000000..77b95dae935c69 --- /dev/null +++ b/extensions/kafka-streams/deployment/src/main/java/io/quarkus/kafka/streams/deployment/KafkaStreamsProcessor.java @@ -0,0 +1,68 @@ +/* + * Copyright 2019 Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.quarkus.kafka.streams.deployment; + +import java.io.IOException; + +import org.apache.kafka.common.serialization.Serdes.ByteArraySerde; +import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler; +import org.apache.kafka.streams.errors.LogAndFailExceptionHandler; +import org.apache.kafka.streams.processor.DefaultPartitionGrouper; +import org.apache.kafka.streams.processor.FailOnInvalidTimestamp; +import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor; + +import io.quarkus.deployment.annotations.BuildProducer; +import io.quarkus.deployment.annotations.BuildStep; +import io.quarkus.deployment.annotations.ExecutionTime; +import io.quarkus.deployment.annotations.Record; +import io.quarkus.deployment.builditem.FeatureBuildItem; +import io.quarkus.deployment.builditem.substrate.ReflectiveClassBuildItem; +import io.quarkus.deployment.builditem.substrate.SubstrateResourceBuildItem; +import io.quarkus.deployment.recording.RecorderContext; +import io.quarkus.kafka.streams.runtime.KafkaStreamsTemplate; + +class KafkaStreamsProcessor { + + @BuildStep + @Record(ExecutionTime.STATIC_INIT) + void build(RecorderContext recorder, + BuildProducer feature, + BuildProducer reflectiveClasses, + BuildProducer nativeLibs) throws IOException { + + feature.produce(new FeatureBuildItem(FeatureBuildItem.KAFKA_STREAMS)); + + reflectiveClasses.produce(new ReflectiveClassBuildItem(true, false, false, StreamsPartitionAssignor.class)); + reflectiveClasses.produce(new ReflectiveClassBuildItem(true, false, false, DefaultPartitionGrouper.class)); + reflectiveClasses.produce(new ReflectiveClassBuildItem(true, false, false, DefaultProductionExceptionHandler.class)); + reflectiveClasses.produce(new ReflectiveClassBuildItem(true, false, false, LogAndFailExceptionHandler.class)); + reflectiveClasses.produce(new ReflectiveClassBuildItem(true, false, false, ByteArraySerde.class)); + reflectiveClasses.produce(new ReflectiveClassBuildItem(true, false, false, FailOnInvalidTimestamp.class)); + + // TODO Can it be chosen based on platform we're running on? + nativeLibs.produce(new SubstrateResourceBuildItem("librocksdbjni-linux64.so")); + nativeLibs.produce(new SubstrateResourceBuildItem("librocksdbjni-osx.jnilib")); + } + + @BuildStep + @Record(ExecutionTime.RUNTIME_INIT) + void build(KafkaStreamsTemplate template) { + // Explicitly loading RocksDB native libs, as that's normally done from within + // static initializers which already ran during build + template.loadRocksDb(); + } +} diff --git a/extensions/kafka-streams/pom.xml b/extensions/kafka-streams/pom.xml new file mode 100644 index 00000000000000..cb6bbfa5ab6692 --- /dev/null +++ b/extensions/kafka-streams/pom.xml @@ -0,0 +1,37 @@ + + + + + + quarkus-build-parent + io.quarkus + 999-SNAPSHOT + ../../build-parent/pom.xml + + 4.0.0 + + quarkus-kafka-streams-parent + Quarkus - Kafka Streams + + pom + + deployment + runtime + + diff --git a/extensions/kafka-streams/runtime/pom.xml b/extensions/kafka-streams/runtime/pom.xml new file mode 100644 index 00000000000000..eb1a37fd1e8646 --- /dev/null +++ b/extensions/kafka-streams/runtime/pom.xml @@ -0,0 +1,67 @@ + + + + + + quarkus-kafka-streams-parent + io.quarkus + 999-SNAPSHOT + ../ + + 4.0.0 + + quarkus-kafka-streams + Quarkus - Kafka Streams - Runtime + + + + io.quarkus + quarkus-core + + + org.apache.kafka + kafka-streams + + + com.oracle.substratevm + svm + + + + + + + io.quarkus + quarkus-bootstrap-maven-plugin + + + maven-compiler-plugin + + + + io.quarkus + quarkus-extension-processor + ${project.version} + + + + + + + diff --git a/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsTemplate.java b/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsTemplate.java new file mode 100644 index 00000000000000..06332490cb4f49 --- /dev/null +++ b/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsTemplate.java @@ -0,0 +1,28 @@ +/* + * Copyright 2019 Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.quarkus.kafka.streams.runtime; + +import org.rocksdb.RocksDB; + +import io.quarkus.runtime.annotations.Template; + +@Template +public class KafkaStreamsTemplate { + + public void loadRocksDb() { + RocksDB.loadLibrary(); + } +} diff --git a/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/graal/KafkaStreamsJniRegistration.java b/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/graal/KafkaStreamsJniRegistration.java new file mode 100644 index 00000000000000..d58cd28247e5ca --- /dev/null +++ b/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/graal/KafkaStreamsJniRegistration.java @@ -0,0 +1,46 @@ +/* + * Copyright 2019 Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.quarkus.kafka.streams.runtime.graal; + +import org.graalvm.nativeimage.Feature; +import org.rocksdb.BlockBasedTableConfig; +import org.rocksdb.FlushOptions; +import org.rocksdb.Options; +import org.rocksdb.RocksDB; +import org.rocksdb.WriteBatch; + +import com.oracle.svm.core.annotate.AutomaticFeature; +import com.oracle.svm.hosted.jni.JNIRuntimeAccess; + +@AutomaticFeature +class KafkaStreamsJniRegistration implements Feature { + + @Override + public void beforeAnalysis(BeforeAnalysisAccess access) { + JNIRuntimeAccess.register( + BlockBasedTableConfig.class, + FlushOptions.class, + Options.class, + RocksDB.class, + WriteBatch.class); + + try { + JNIRuntimeAccess.register(Options.class.getDeclaredMethod("newOptions")); + } catch (NoSuchMethodException | SecurityException e) { + throw new RuntimeException(e); + } + } +} diff --git a/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/graal/KafkaStreamsSubstitutions.java b/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/graal/KafkaStreamsSubstitutions.java new file mode 100644 index 00000000000000..4de606b5f3c7d1 --- /dev/null +++ b/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/graal/KafkaStreamsSubstitutions.java @@ -0,0 +1,110 @@ +/* + * Copyright 2019 Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.quarkus.kafka.streams.runtime.graal; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicReference; + +import org.rocksdb.CompressionType; + +import com.oracle.svm.core.annotate.Alias; +import com.oracle.svm.core.annotate.RecomputeFieldValue; +import com.oracle.svm.core.annotate.RecomputeFieldValue.Kind; +import com.oracle.svm.core.annotate.Substitute; +import com.oracle.svm.core.annotate.TargetClass; + +/** + * loadLibrary() loads the RocksDB native libraries and is called from multiple static initializers. + * It's designed to run only once (guarded by the {@code libraryLoaded} field). As I don't know how + * to reset this field to its initial value (cannot access the original {@code LibraryState} type + * from within the substitution class, the entire method is substituted. + */ +@TargetClass(className = "org.rocksdb.RocksDB") +final class Target_org_rocksdb_RocksDB { + + private enum LibraryState { + NOT_LOADED, + LOADING, + LOADED + } + + @Alias + @RecomputeFieldValue(kind = Kind.FromAlias) + private static AtomicReference libraryLoaded = new AtomicReference<>(LibraryState.NOT_LOADED); + + @Substitute + public static void loadLibrary() { + System.out.println("KSS 1"); + if (libraryLoaded.get() == LibraryState.LOADED) { + return; + } + + System.out.println("KSS 2"); + + if (libraryLoaded.compareAndSet(LibraryState.NOT_LOADED, + LibraryState.LOADING)) { + final String tmpDir = System.getenv("ROCKSDB_SHAREDLIB_DIR"); + // loading possibly necessary libraries. + for (final CompressionType compressionType : CompressionType.values()) { + try { + if (compressionType.getLibraryName() != null) { + System.loadLibrary(compressionType.getLibraryName()); + } + } catch (UnsatisfiedLinkError e) { + // since it may be optional, we ignore its loading failure here. + } + } + + System.out.println("KSS 3"); + try { + org.rocksdb.NativeLibraryLoader.getInstance().loadLibrary(tmpDir); + System.out.println("KSS 4"); + } catch (IOException e) { + System.out.println("KSS 5"); + libraryLoaded.set(LibraryState.NOT_LOADED); + throw new RuntimeException("Unable to load the RocksDB shared library", e); + } + + System.out.println("KSS 6"); + libraryLoaded.set(LibraryState.LOADED); + return; + } + + while (libraryLoaded.get() == LibraryState.LOADING) { + try { + Thread.sleep(10); + } catch (final InterruptedException e) { + //ignore + } + } + } +} + +/** + * Resets the {@code initialized} field, so that the native libs are loaded again at + * image runtime, after they have been loaded once at build time via calls from static + * initializers. + */ +@TargetClass(className = "org.rocksdb.NativeLibraryLoader") +final class Target_org_rocksdb_NativeLibraryLoader { + + @Alias + @RecomputeFieldValue(kind = RecomputeFieldValue.Kind.Reset) + private static boolean initialized = false; +} + +public final class KafkaStreamsSubstitutions { +} diff --git a/extensions/pom.xml b/extensions/pom.xml index 28de77bceba897..8b42fa37d2ec7c 100644 --- a/extensions/pom.xml +++ b/extensions/pom.xml @@ -81,6 +81,7 @@ hibernate-search-elasticsearch elasticsearch-rest-client kafka-client + kafka-streams spring-di