Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce a BuildItem in DevKafkaService to add additional config #25140

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package io.quarkus.kafka.client.deployment;

import io.quarkus.builder.item.MultiBuildItem;

public final class DevServiceKafkaAdditionalBuildItem extends MultiBuildItem {
private final String config;
gastaldi marked this conversation as resolved.
Show resolved Hide resolved

public DevServiceKafkaAdditionalBuildItem(String config) {
this.config = config;
}

public String getConfig() {
return config;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ public DevServicesResultBuildItem startKafkaDevService(
LaunchModeBuildItem launchMode,
KafkaBuildTimeConfig kafkaClientBuildTimeConfig,
List<DevServicesSharedNetworkBuildItem> devServicesSharedNetworkBuildItem,
List<DevServiceKafkaAdditionalBuildItem> additionalBuildItems,
Optional<ConsoleInstalledBuildItem> consoleInstalledBuildItem,
CuratedApplicationShutdownBuildItem closeBuildItem,
LoggingSetupBuildItem loggingSetupBuildItem, GlobalDevServicesConfig devServicesConfig) {
Expand All @@ -93,7 +94,7 @@ public DevServicesResultBuildItem startKafkaDevService(
try {
devService = startKafka(configuration, launchMode,
!devServicesSharedNetworkBuildItem.isEmpty(),
devServicesConfig.timeout);
devServicesConfig.timeout, additionalBuildItems);
if (devService == null) {
compressor.closeAndDumpCaptured();
} else {
Expand Down Expand Up @@ -193,7 +194,8 @@ private void shutdownBroker() {
}

private RunningDevService startKafka(KafkaDevServiceCfg config,
LaunchModeBuildItem launchMode, boolean useSharedNetwork, Optional<Duration> timeout) {
LaunchModeBuildItem launchMode, boolean useSharedNetwork, Optional<Duration> timeout,
List<DevServiceKafkaAdditionalBuildItem> additions) {
if (!config.devServicesEnabled) {
// explicitly disabled
log.debug("Not starting dev services for Kafka, as it has been disabled in the config.");
Expand Down Expand Up @@ -242,7 +244,7 @@ private RunningDevService startKafka(KafkaDevServiceCfg config,
return new RunningDevService(Feature.KAFKA_CLIENT.getName(),
container.getContainerId(),
container::close,
KAFKA_BOOTSTRAP_SERVERS, container.getBootstrapServers());
getConfigs(container.getBootstrapServers(), additions));
} else {
RedPandaKafkaContainer container = new RedPandaKafkaContainer(
DockerImageName.parse(config.imageName),
Expand All @@ -255,18 +257,28 @@ private RunningDevService startKafka(KafkaDevServiceCfg config,
return new RunningDevService(Feature.KAFKA_CLIENT.getName(),
container.getContainerId(),
container::close,
KAFKA_BOOTSTRAP_SERVERS, container.getBootstrapServers());
getConfigs(container.getBootstrapServers(), additions));
}
};

return maybeContainerAddress
.map(containerAddress -> new RunningDevService(Feature.KAFKA_CLIENT.getName(),
containerAddress.getId(),
null,
KAFKA_BOOTSTRAP_SERVERS, containerAddress.getUrl()))
getConfigs(containerAddress.getUrl(), additions)))
.orElseGet(defaultKafkaBrokerSupplier);
}

private Map<String, String> getConfigs(String address, List<DevServiceKafkaAdditionalBuildItem> additions) {
HashMap<String, String> configs = new HashMap<>();
configs.put(KAFKA_BOOTSTRAP_SERVERS, address);
additions.forEach(c -> {
configs.put(c.getConfig(), address);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The BuildItem is used exclusively to set the Kafka's server address as a value for other keys?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, we use it in camel.component.kafka.brokers see apache/camel-quarkus#3742

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also you can check the detail about the motivation of these changes in #25094

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, I am just concerned that the class name DevServiceKafkaAdditionalBuildItem may be too generic for this reason

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, it is hard to name it :) any thought ?

});

return configs;
}

private boolean hasKafkaChannelWithoutBootstrapServers() {
Config config = ConfigProvider.getConfig();
for (String name : config.getPropertyNames()) {
Expand Down