diff --git a/core/src/main/java/org/testcontainers/containers/GenericContainer.java b/core/src/main/java/org/testcontainers/containers/GenericContainer.java index 56ca5345ecd..50e4b927617 100644 --- a/core/src/main/java/org/testcontainers/containers/GenericContainer.java +++ b/core/src/main/java/org/testcontainers/containers/GenericContainer.java @@ -135,7 +135,7 @@ public class GenericContainer> @NonNull private String networkMode; - @NonNull + @Nullable private Network network; @NonNull diff --git a/modules/kafka/src/main/java/org/testcontainers/containers/KafkaContainer.java b/modules/kafka/src/main/java/org/testcontainers/containers/KafkaContainer.java index d9c8e35e852..f62ca23eb57 100644 --- a/modules/kafka/src/main/java/org/testcontainers/containers/KafkaContainer.java +++ b/modules/kafka/src/main/java/org/testcontainers/containers/KafkaContainer.java @@ -2,12 +2,8 @@ import com.github.dockerjava.api.command.InspectContainerResponse; import lombok.SneakyThrows; -import org.testcontainers.images.builder.Transferable; import org.testcontainers.utility.DockerImageName; -import java.nio.charset.StandardCharsets; -import java.util.Comparator; - /** * This container wraps Confluent Kafka and Zookeeper (optionally) * @@ -17,20 +13,14 @@ public class KafkaContainer extends GenericContainer { private static final DockerImageName DEFAULT_IMAGE_NAME = DockerImageName.parse("confluentinc/cp-kafka"); private static final String DEFAULT_TAG = "5.4.3"; - private static final String STARTER_SCRIPT = "/testcontainers_start.sh"; - public static final int KAFKA_PORT = 9093; public static final int ZOOKEEPER_PORT = 2181; private static final String DEFAULT_INTERNAL_TOPIC_RF = "1"; - private static final int PORT_NOT_ASSIGNED = -1; - protected String externalZookeeperConnect = null; - private int port = PORT_NOT_ASSIGNED; - /** * @deprecated use {@link KafkaContainer(DockerImageName)} instead */ @@ -80,83 +70,59 @@ public KafkaContainer withExternalZookeeper(String connectString) { } public String getBootstrapServers() { - if (port == PORT_NOT_ASSIGNED) { - throw new IllegalStateException("You should start Kafka container first"); - } - return String.format("PLAINTEXT://%s:%s", getHost(), port); - } - - @Override - protected void doStart() { - withCommand("sh", "-c", "while [ ! -f " + STARTER_SCRIPT + " ]; do sleep 0.1; done; " + STARTER_SCRIPT); - - if (externalZookeeperConnect == null) { - addExposedPort(ZOOKEEPER_PORT); - } - - super.doStart(); + return String.format("PLAINTEXT://%s:%s", getHost(), getMappedPort(KAFKA_PORT)); } @Override - @SneakyThrows - protected void containerIsStarting(InspectContainerResponse containerInfo, boolean reused) { - super.containerIsStarting(containerInfo, reused); - - port = getMappedPort(KAFKA_PORT); - - if (reused) { - return; - } + protected void configure() { + withEnv( + "KAFKA_ADVERTISED_LISTENERS", + String.format( + "BROKER://%s:9092", + getNetwork() != null + ? getNetworkAliases().get(0) + : "localhost" + ) + ); String command = "#!/bin/bash\n"; - final String zookeeperConnect; if (externalZookeeperConnect != null) { - zookeeperConnect = externalZookeeperConnect; + withEnv("KAFKA_ZOOKEEPER_CONNECT", externalZookeeperConnect); } else { - zookeeperConnect = "localhost:" + ZOOKEEPER_PORT; + addExposedPort(ZOOKEEPER_PORT); + withEnv("KAFKA_ZOOKEEPER_CONNECT", "localhost:" + ZOOKEEPER_PORT); command += "echo 'clientPort=" + ZOOKEEPER_PORT + "' > zookeeper.properties\n"; command += "echo 'dataDir=/var/lib/zookeeper/data' >> zookeeper.properties\n"; command += "echo 'dataLogDir=/var/lib/zookeeper/log' >> zookeeper.properties\n"; command += "zookeeper-server-start zookeeper.properties &\n"; } - command += "export KAFKA_ZOOKEEPER_CONNECT='" + zookeeperConnect + "'\n"; - - command += "export KAFKA_ADVERTISED_LISTENERS='" + String.join(",", getBootstrapServers(), brokerAdvertisedListener(containerInfo)) + "'\n"; - - command += ". /etc/confluent/docker/bash-config \n"; - command += "/etc/confluent/docker/configure \n"; - command += "/etc/confluent/docker/launch \n"; + // Optimization: skip the checks + command += "echo '' > /etc/confluent/docker/ensure \n"; + // Run the original command + command += "/etc/confluent/docker/run \n"; + withCommand("sh", "-c", command); + } - copyFileToContainer( - Transferable.of(command.getBytes(StandardCharsets.UTF_8), 0777), - STARTER_SCRIPT + @Override + @SneakyThrows + protected void containerIsStarted(InspectContainerResponse containerInfo) { + String brokerAdvertisedListener = brokerAdvertisedListener(containerInfo); + ExecResult result = execInContainer( + "kafka-configs", + "--alter", + "--bootstrap-server", brokerAdvertisedListener, + "--entity-type", "brokers", + "--entity-name", getEnvMap().get("KAFKA_BROKER_ID"), + "--add-config", + "advertised.listeners=[" + String.join(",", getBootstrapServers(), brokerAdvertisedListener) + "]" ); + if (result.getExitCode() != 0) { + throw new IllegalStateException(result.getStderr()); + } } protected String brokerAdvertisedListener(InspectContainerResponse containerInfo) { - // Kafka supports only one INTER_BROKER listener, so we have to pick one. - // The current algorithm uses the following order of resolving the IP: - // 1. Custom network's IP set via `withNetwork` - // 2. Bridge network's IP - // 3. Best effort fallback to getNetworkSettings#ipAddress - String ipAddress = containerInfo.getNetworkSettings().getNetworks().entrySet() - .stream() - .filter(it -> it.getValue().getIpAddress() != null) - .max(Comparator.comparingInt(entry -> { - if (getNetwork() != null && getNetwork().getId().equals(entry.getValue().getNetworkID())) { - return 2; - } - - if ("bridge".equals(entry.getKey())) { - return 1; - } - - return 0; - })) - .map(it -> it.getValue().getIpAddress()) - .orElseGet(() -> containerInfo.getNetworkSettings().getIpAddress()); - - return String.format("BROKER://%s:%s", ipAddress, "9092"); + return String.format("BROKER://%s:%s", containerInfo.getConfig().getHostName(), "9092"); } }