Skip to content

Commit

Permalink
Kafka Streams fire event after created and before scheduling the start.
Browse files Browse the repository at this point in the history
schedules the KafkaStreams#start at StartupEvent instead of PostConstruct

Fixes quarkusio#36341 and quarkusio#36774

Corrects Kafka streams default kafka broker to localhost:9092 instead of 9012

(cherry picked from commit 130631d)
  • Loading branch information
ozangunalp authored and gsmet committed Feb 10, 2024
1 parent 04fdc6e commit 244d353
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,12 @@
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.stream.Collectors;

import jakarta.annotation.PostConstruct;
import jakarta.enterprise.event.Event;
import jakarta.enterprise.event.Observes;
import jakarta.enterprise.inject.Instance;
import jakarta.enterprise.inject.Produces;
Expand All @@ -44,10 +43,9 @@
import org.eclipse.microprofile.config.ConfigProvider;
import org.jboss.logging.Logger;

import io.quarkus.arc.Arc;
import io.quarkus.arc.Unremovable;
import io.quarkus.runtime.ShutdownEvent;
import io.quarkus.runtime.Startup;
import io.quarkus.runtime.StartupEvent;
import io.smallrye.common.annotation.Identifier;

/**
Expand All @@ -64,12 +62,16 @@ public class KafkaStreamsProducer {
private static volatile boolean shutdown = false;

private final ExecutorService executorService;
private final StreamsConfig streamsConfig;
private final KafkaStreams kafkaStreams;
private final KafkaStreamsTopologyManager kafkaStreamsTopologyManager;
private final Admin kafkaAdminClient;
private final Duration topicsTimeout;
private final List<String> trimmedTopics;

@Inject
public KafkaStreamsProducer(KafkaStreamsSupport kafkaStreamsSupport, KafkaStreamsRuntimeConfig runtimeConfig,
ExecutorService executorService,
Instance<Topology> topology, Instance<KafkaClientSupplier> kafkaClientSupplier,
@Identifier("default-kafka-broker") Instance<Map<String, Object>> defaultConfiguration,
Instance<StateListener> stateListener, Instance<StateRestoreListener> globalStateRestoreListener,
Expand All @@ -79,9 +81,12 @@ public KafkaStreamsProducer(KafkaStreamsSupport kafkaStreamsSupport, KafkaStream
if (topology.isUnsatisfied()) {
LOGGER.warn("No Topology producer; Kafka Streams will not be started");
this.executorService = null;
this.streamsConfig = null;
this.kafkaStreams = null;
this.kafkaStreamsTopologyManager = null;
this.kafkaAdminClient = null;
this.topicsTimeout = null;
this.trimmedTopics = null;
return;
}

Expand All @@ -101,33 +106,53 @@ public KafkaStreamsProducer(KafkaStreamsSupport kafkaStreamsSupport, KafkaStream
runtimeConfig);
this.kafkaAdminClient = Admin.create(getAdminClientConfig(kafkaStreamsProperties));

this.executorService = Executors.newSingleThreadExecutor();
this.executorService = executorService;

this.kafkaStreams = initializeKafkaStreams(kafkaStreamsProperties, runtimeConfig, kafkaAdminClient, topology.get(),
kafkaClientSupplier, stateListener, globalStateRestoreListener, uncaughtExceptionHandlerListener,
executorService);
this.topicsTimeout = runtimeConfig.topicsTimeout;
this.trimmedTopics = runtimeConfig.getTrimmedTopics();
this.streamsConfig = new StreamsConfig(kafkaStreamsProperties);
this.kafkaStreams = initializeKafkaStreams(streamsConfig, topology.get(),
kafkaClientSupplier, stateListener, globalStateRestoreListener, uncaughtExceptionHandlerListener);
this.kafkaStreamsTopologyManager = new KafkaStreamsTopologyManager(kafkaAdminClient);
}

@PostConstruct
public void postConstruct() {
public void onStartup(@Observes StartupEvent event, Event<KafkaStreams> kafkaStreamsEvent) {
if (kafkaStreams != null) {
Arc.container().beanManager().getEvent().select(KafkaStreams.class).fire(kafkaStreams);
kafkaStreamsEvent.fire(kafkaStreams);
executorService.execute(() -> {
if (topicsTimeout.compareTo(Duration.ZERO) > 0) {
try {
waitForTopicsToBeCreated(kafkaAdminClient, trimmedTopics, topicsTimeout);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
}
if (!shutdown) {
LOGGER.debug("Starting Kafka Streams pipeline");
kafkaStreams.start();
}
});
}
}

@Produces
@Singleton
@Unremovable
@Startup
public KafkaStreams getKafkaStreams() {
return kafkaStreams;
}

@Produces
@Singleton
@Unremovable
@Startup
public StreamsConfig getStreamsConfig() {
return streamsConfig;
}

@Produces
@Singleton
@Unremovable
public KafkaStreamsTopologyManager kafkaStreamsTopologyManager() {
return kafkaStreamsTopologyManager;
}
Expand All @@ -146,16 +171,15 @@ void onStop(@Observes ShutdownEvent event) {
}
}

private static KafkaStreams initializeKafkaStreams(Properties kafkaStreamsProperties,
KafkaStreamsRuntimeConfig runtimeConfig, Admin adminClient, Topology topology,
private static KafkaStreams initializeKafkaStreams(StreamsConfig streamsConfig, Topology topology,
Instance<KafkaClientSupplier> kafkaClientSupplier,
Instance<StateListener> stateListener, Instance<StateRestoreListener> globalStateRestoreListener,
Instance<StreamsUncaughtExceptionHandler> uncaughtExceptionHandlerListener, ExecutorService executorService) {
Instance<StreamsUncaughtExceptionHandler> uncaughtExceptionHandlerListener) {
KafkaStreams kafkaStreams;
if (kafkaClientSupplier.isUnsatisfied()) {
kafkaStreams = new KafkaStreams(topology, kafkaStreamsProperties);
kafkaStreams = new KafkaStreams(topology, streamsConfig);
} else {
kafkaStreams = new KafkaStreams(topology, kafkaStreamsProperties, kafkaClientSupplier.get());
kafkaStreams = new KafkaStreams(topology, streamsConfig, kafkaClientSupplier.get());
}

if (!stateListener.isUnsatisfied()) {
Expand All @@ -168,21 +192,6 @@ private static KafkaStreams initializeKafkaStreams(Properties kafkaStreamsProper
kafkaStreams.setUncaughtExceptionHandler(uncaughtExceptionHandlerListener.get());
}

executorService.execute(() -> {
if (runtimeConfig.topicsTimeout.compareTo(Duration.ZERO) > 0) {
try {
waitForTopicsToBeCreated(adminClient, runtimeConfig.getTrimmedTopics(), runtimeConfig.topicsTimeout);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
}
if (!shutdown) {
LOGGER.debug("Starting Kafka Streams pipeline");
kafkaStreams.start();
}
});

return kafkaStreams;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public class KafkaStreamsRuntimeConfig {
/**
* Default Kafka bootstrap server.
*/
public static final String DEFAULT_KAFKA_BROKER = "localhost:9012";
public static final String DEFAULT_KAFKA_BROKER = "localhost:9092";

/**
* A unique identifier for this Kafka Streams application.
Expand All @@ -27,7 +27,7 @@ public class KafkaStreamsRuntimeConfig {

/**
* A comma-separated list of host:port pairs identifying the Kafka bootstrap server(s).
* If not set, fallback to {@code kafka.bootstrap.servers}, and if not set either use {@code localhost:9012}.
* If not set, fallback to {@code kafka.bootstrap.servers}, and if not set either use {@code localhost:9092}.
*/
@ConfigItem(defaultValue = DEFAULT_KAFKA_BROKER)
public List<InetSocketAddress> bootstrapServers;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,16 @@
import jakarta.enterprise.event.Observes;

import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;

@ApplicationScoped
public class KafkaStreamsEventCounter {

LongAdder eventCount = new LongAdder();

void onKafkaStreamsEvent(@Observes KafkaStreams kafkaStreams) {
void onKafkaStreamsEvent(@Observes KafkaStreams kafkaStreams, StreamsConfig streamsConfig) {
assert kafkaStreams.state() == KafkaStreams.State.CREATED;
assert streamsConfig != null;
eventCount.increment();
}

Expand Down

0 comments on commit 244d353

Please sign in to comment.