diff --git a/extensions/kafka-client/deployment/pom.xml b/extensions/kafka-client/deployment/pom.xml index 0f7e820ce8c28..720455817f9e3 100644 --- a/extensions/kafka-client/deployment/pom.xml +++ b/extensions/kafka-client/deployment/pom.xml @@ -53,6 +53,10 @@ io.quarkus quarkus-junit4-mock + + io.quarkus + quarkus-devservices-common + diff --git a/extensions/kafka-client/deployment/src/main/java/io/quarkus/kafka/client/deployment/DevServicesKafkaProcessor.java b/extensions/kafka-client/deployment/src/main/java/io/quarkus/kafka/client/deployment/DevServicesKafkaProcessor.java index 69c2d2afc0316..c5732eed2feb8 100644 --- a/extensions/kafka-client/deployment/src/main/java/io/quarkus/kafka/client/deployment/DevServicesKafkaProcessor.java +++ b/extensions/kafka-client/deployment/src/main/java/io/quarkus/kafka/client/deployment/DevServicesKafkaProcessor.java @@ -2,13 +2,11 @@ import java.io.Closeable; import java.nio.charset.StandardCharsets; -import java.util.List; import java.util.Objects; import org.eclipse.microprofile.config.Config; import org.eclipse.microprofile.config.ConfigProvider; import org.jboss.logging.Logger; -import org.testcontainers.DockerClientFactory; import org.testcontainers.containers.GenericContainer; import org.testcontainers.containers.Network; import org.testcontainers.containers.wait.strategy.Wait; @@ -16,8 +14,6 @@ import org.testcontainers.utility.DockerImageName; import com.github.dockerjava.api.command.InspectContainerResponse; -import com.github.dockerjava.api.model.Container; -import com.github.dockerjava.api.model.ContainerPort; import io.quarkus.bootstrap.classloading.QuarkusClassLoader; import io.quarkus.deployment.IsDockerWorking; @@ -28,6 +24,7 @@ import io.quarkus.deployment.builditem.LaunchModeBuildItem; import io.quarkus.deployment.builditem.RunTimeConfigurationDefaultBuildItem; import io.quarkus.deployment.builditem.ServiceStartBuildItem; +import io.quarkus.devservices.common.ContainerLocator; import io.quarkus.runtime.LaunchMode; import io.quarkus.runtime.configuration.ConfigUtils; @@ -46,6 +43,8 @@ public class DevServicesKafkaProcessor { */ private static final String DEV_SERVICE_LABEL = "quarkus-dev-service-kafka"; + private static final ContainerLocator kafkaContainerLocator = new ContainerLocator(DEV_SERVICE_LABEL, KAFKA_PORT); + static volatile Closeable closeable; static volatile KafkaDevServiceCfg cfg; static volatile boolean first = true; @@ -138,30 +137,8 @@ private void shutdownBroker() { } } - private static Container lookup(String expectedLabelValue) { - List containers = DockerClientFactory.lazyClient().listContainersCmd().exec(); - for (Container container : containers) { - String s = container.getLabels().get(DEV_SERVICE_LABEL); - if (expectedLabelValue.equalsIgnoreCase(s)) { - return container; - } - } - return null; - } - - private static ContainerPort getMappedPort(Container container, int port) { - for (ContainerPort p : container.getPorts()) { - Integer mapped = p.getPrivatePort(); - Integer publicPort = p.getPublicPort(); - if (mapped != null && mapped == port && publicPort != null) { - return p; - } - } - return null; - } - private KafkaBroker startKafka(KafkaDevServiceCfg config, - LaunchModeBuildItem launchMode) { + LaunchModeBuildItem launchMode) { if (!config.devServicesEnabled) { // explicitly disabled log.debug("Not starting dev services for Kafka, as it has been disabled in the config."); @@ -188,18 +165,8 @@ private KafkaBroker startKafka(KafkaDevServiceCfg config, if (config.shared && launchMode.getLaunchMode() == LaunchMode.DEVELOPMENT) { // Detect if there is a broker already started using container labels. - Container container = lookup(config.serviceName); - if (container != null) { - ContainerPort port = getMappedPort(container, KAFKA_PORT); - if (port != null) { - String url = port.getIp() + ":" + port.getPublicPort(); - log.infof("Dev Services for Kafka container found: %s (%s). " - + "Connecting to: %s.", - container.getId(), - container.getImage(), url); - return new KafkaBroker(url, null); - } - } + final String url = kafkaContainerLocator.locateContainer(config.serviceName); + return new KafkaBroker(url, null); } // Starting the broker