From 244d35349a68198e3ef32efe259d83cddaa17e09 Mon Sep 17 00:00:00 2001 From: Ozan Gunalp Date: Mon, 9 Oct 2023 17:01:27 +0200 Subject: [PATCH] Kafka Streams fire event after created and before scheduling the start. schedules the KafkaStreams#start at StartupEvent instead of PostConstruct Fixes #36341 and #36774 Corrects Kafka streams default kafka broker to localhost:9092 instead of 9012 (cherry picked from commit 130631d8ab872c2bb44a67c2ba4da4e179d731ad) --- .../streams/runtime/KafkaStreamsProducer.java | 75 +++++++++++-------- .../runtime/KafkaStreamsRuntimeConfig.java | 4 +- .../streams/KafkaStreamsEventCounter.java | 5 +- 3 files changed, 48 insertions(+), 36 deletions(-) diff --git a/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsProducer.java b/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsProducer.java index 4318cc16c35d2..f03ddabeb9bda 100644 --- a/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsProducer.java +++ b/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsProducer.java @@ -15,13 +15,12 @@ import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.function.Function; import java.util.stream.Collectors; -import jakarta.annotation.PostConstruct; +import jakarta.enterprise.event.Event; import jakarta.enterprise.event.Observes; import jakarta.enterprise.inject.Instance; import jakarta.enterprise.inject.Produces; @@ -44,10 +43,9 @@ import org.eclipse.microprofile.config.ConfigProvider; import org.jboss.logging.Logger; -import io.quarkus.arc.Arc; import io.quarkus.arc.Unremovable; import io.quarkus.runtime.ShutdownEvent; -import io.quarkus.runtime.Startup; +import io.quarkus.runtime.StartupEvent; import io.smallrye.common.annotation.Identifier; /** @@ -64,12 +62,16 @@ public class KafkaStreamsProducer { private static volatile boolean shutdown = false; private final ExecutorService executorService; + private final StreamsConfig streamsConfig; private final KafkaStreams kafkaStreams; private final KafkaStreamsTopologyManager kafkaStreamsTopologyManager; private final Admin kafkaAdminClient; + private final Duration topicsTimeout; + private final List trimmedTopics; @Inject public KafkaStreamsProducer(KafkaStreamsSupport kafkaStreamsSupport, KafkaStreamsRuntimeConfig runtimeConfig, + ExecutorService executorService, Instance topology, Instance kafkaClientSupplier, @Identifier("default-kafka-broker") Instance> defaultConfiguration, Instance stateListener, Instance globalStateRestoreListener, @@ -79,9 +81,12 @@ public KafkaStreamsProducer(KafkaStreamsSupport kafkaStreamsSupport, KafkaStream if (topology.isUnsatisfied()) { LOGGER.warn("No Topology producer; Kafka Streams will not be started"); this.executorService = null; + this.streamsConfig = null; this.kafkaStreams = null; this.kafkaStreamsTopologyManager = null; this.kafkaAdminClient = null; + this.topicsTimeout = null; + this.trimmedTopics = null; return; } @@ -101,25 +106,39 @@ public KafkaStreamsProducer(KafkaStreamsSupport kafkaStreamsSupport, KafkaStream runtimeConfig); this.kafkaAdminClient = Admin.create(getAdminClientConfig(kafkaStreamsProperties)); - this.executorService = Executors.newSingleThreadExecutor(); + this.executorService = executorService; - this.kafkaStreams = initializeKafkaStreams(kafkaStreamsProperties, runtimeConfig, kafkaAdminClient, topology.get(), - kafkaClientSupplier, stateListener, globalStateRestoreListener, uncaughtExceptionHandlerListener, - executorService); + this.topicsTimeout = runtimeConfig.topicsTimeout; + this.trimmedTopics = runtimeConfig.getTrimmedTopics(); + this.streamsConfig = new StreamsConfig(kafkaStreamsProperties); + this.kafkaStreams = initializeKafkaStreams(streamsConfig, topology.get(), + kafkaClientSupplier, stateListener, globalStateRestoreListener, uncaughtExceptionHandlerListener); this.kafkaStreamsTopologyManager = new KafkaStreamsTopologyManager(kafkaAdminClient); } - @PostConstruct - public void postConstruct() { + public void onStartup(@Observes StartupEvent event, Event kafkaStreamsEvent) { if (kafkaStreams != null) { - Arc.container().beanManager().getEvent().select(KafkaStreams.class).fire(kafkaStreams); + kafkaStreamsEvent.fire(kafkaStreams); + executorService.execute(() -> { + if (topicsTimeout.compareTo(Duration.ZERO) > 0) { + try { + waitForTopicsToBeCreated(kafkaAdminClient, trimmedTopics, topicsTimeout); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return; + } + } + if (!shutdown) { + LOGGER.debug("Starting Kafka Streams pipeline"); + kafkaStreams.start(); + } + }); } } @Produces @Singleton @Unremovable - @Startup public KafkaStreams getKafkaStreams() { return kafkaStreams; } @@ -127,7 +146,13 @@ public KafkaStreams getKafkaStreams() { @Produces @Singleton @Unremovable - @Startup + public StreamsConfig getStreamsConfig() { + return streamsConfig; + } + + @Produces + @Singleton + @Unremovable public KafkaStreamsTopologyManager kafkaStreamsTopologyManager() { return kafkaStreamsTopologyManager; } @@ -146,16 +171,15 @@ void onStop(@Observes ShutdownEvent event) { } } - private static KafkaStreams initializeKafkaStreams(Properties kafkaStreamsProperties, - KafkaStreamsRuntimeConfig runtimeConfig, Admin adminClient, Topology topology, + private static KafkaStreams initializeKafkaStreams(StreamsConfig streamsConfig, Topology topology, Instance kafkaClientSupplier, Instance stateListener, Instance globalStateRestoreListener, - Instance uncaughtExceptionHandlerListener, ExecutorService executorService) { + Instance uncaughtExceptionHandlerListener) { KafkaStreams kafkaStreams; if (kafkaClientSupplier.isUnsatisfied()) { - kafkaStreams = new KafkaStreams(topology, kafkaStreamsProperties); + kafkaStreams = new KafkaStreams(topology, streamsConfig); } else { - kafkaStreams = new KafkaStreams(topology, kafkaStreamsProperties, kafkaClientSupplier.get()); + kafkaStreams = new KafkaStreams(topology, streamsConfig, kafkaClientSupplier.get()); } if (!stateListener.isUnsatisfied()) { @@ -168,21 +192,6 @@ private static KafkaStreams initializeKafkaStreams(Properties kafkaStreamsProper kafkaStreams.setUncaughtExceptionHandler(uncaughtExceptionHandlerListener.get()); } - executorService.execute(() -> { - if (runtimeConfig.topicsTimeout.compareTo(Duration.ZERO) > 0) { - try { - waitForTopicsToBeCreated(adminClient, runtimeConfig.getTrimmedTopics(), runtimeConfig.topicsTimeout); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - return; - } - } - if (!shutdown) { - LOGGER.debug("Starting Kafka Streams pipeline"); - kafkaStreams.start(); - } - }); - return kafkaStreams; } diff --git a/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsRuntimeConfig.java b/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsRuntimeConfig.java index 7e4e6347650d9..7a5b58d54a580 100644 --- a/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsRuntimeConfig.java +++ b/extensions/kafka-streams/runtime/src/main/java/io/quarkus/kafka/streams/runtime/KafkaStreamsRuntimeConfig.java @@ -16,7 +16,7 @@ public class KafkaStreamsRuntimeConfig { /** * Default Kafka bootstrap server. */ - public static final String DEFAULT_KAFKA_BROKER = "localhost:9012"; + public static final String DEFAULT_KAFKA_BROKER = "localhost:9092"; /** * A unique identifier for this Kafka Streams application. @@ -27,7 +27,7 @@ public class KafkaStreamsRuntimeConfig { /** * A comma-separated list of host:port pairs identifying the Kafka bootstrap server(s). - * If not set, fallback to {@code kafka.bootstrap.servers}, and if not set either use {@code localhost:9012}. + * If not set, fallback to {@code kafka.bootstrap.servers}, and if not set either use {@code localhost:9092}. */ @ConfigItem(defaultValue = DEFAULT_KAFKA_BROKER) public List bootstrapServers; diff --git a/integration-tests/kafka-streams/src/main/java/io/quarkus/it/kafka/streams/KafkaStreamsEventCounter.java b/integration-tests/kafka-streams/src/main/java/io/quarkus/it/kafka/streams/KafkaStreamsEventCounter.java index e7affadc599dd..95ab6a5c52735 100644 --- a/integration-tests/kafka-streams/src/main/java/io/quarkus/it/kafka/streams/KafkaStreamsEventCounter.java +++ b/integration-tests/kafka-streams/src/main/java/io/quarkus/it/kafka/streams/KafkaStreamsEventCounter.java @@ -6,13 +6,16 @@ import jakarta.enterprise.event.Observes; import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.StreamsConfig; @ApplicationScoped public class KafkaStreamsEventCounter { LongAdder eventCount = new LongAdder(); - void onKafkaStreamsEvent(@Observes KafkaStreams kafkaStreams) { + void onKafkaStreamsEvent(@Observes KafkaStreams kafkaStreams, StreamsConfig streamsConfig) { + assert kafkaStreams.state() == KafkaStreams.State.CREATED; + assert streamsConfig != null; eventCount.increment(); }