diff --git a/src/main/docs/guide/modules-kafka.adoc b/src/main/docs/guide/modules-kafka.adoc
index c7e9ecb5e..ab4816cbf 100644
--- a/src/main/docs/guide/modules-kafka.adoc
+++ b/src/main/docs/guide/modules-kafka.adoc
@@ -3,3 +3,16 @@ Kafka support will automatically start a https://kafka.apache.org[Kafka containe
The default image can be overwritten by setting the `test-resources.containers.kafka.image-name` property.
TIP: See the guide for https://guides.micronaut.io/latest/testing-micronaut-kafka-listener-using-testcontainers.html[Testing Kafka Listener using Testcontainers with the Micronaut Framework] to learn more.
+
+
+https://docs.confluent.io/platform/current/kafka-metadata/kraft.html#kraft-overview[Kraft Mode] is supported via the `test-resources.containers.kafka.kraft` property.
+
+[configuration]
+----
+test-resources:
+ containers:
+ kafka:
+ kraft: true
+----
+
+NOTE: This switches to the https://docs.confluent.io/platform/current/installation/docker/image-reference.html#ak-images[confluent-local] Docker image with https://java.testcontainers.org/modules/kafka/#using-kraft-mode[TestContainer Kraft Support]
diff --git a/test-resources-kafka/src/main/java/io/micronaut/testresources/kafka/KafkaConfigurationSupport.java b/test-resources-kafka/src/main/java/io/micronaut/testresources/kafka/KafkaConfigurationSupport.java
new file mode 100644
index 000000000..dfac0dc0f
--- /dev/null
+++ b/test-resources-kafka/src/main/java/io/micronaut/testresources/kafka/KafkaConfigurationSupport.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2017-2024 original authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.micronaut.testresources.kafka;
+
+import java.util.Map;
+
+/**
+ * Internal class to deal with the test resources
+ * kafka configuration block.
+ */
+abstract class KafkaConfigurationSupport {
+ public static final String CONFIG_KAFKA_KRAFT_MODE = "containers.kafka.kraft";
+
+ private KafkaConfigurationSupport() {
+
+ }
+
+ /**
+ * Start Kafka container in Kraft mode with the confluent-local image.
+ * See: Confluent Kafka Images
+ * See: TestContainers Kafka Kraft Mode
+ */
+ static boolean isKraftMode(Map testResourcesConfig) {
+ Boolean clusterMode = (Boolean) testResourcesConfig.getOrDefault(CONFIG_KAFKA_KRAFT_MODE, false);
+ return Boolean.TRUE.equals(clusterMode);
+ }
+}
diff --git a/test-resources-kafka/src/main/java/io/micronaut/testresources/kafka/KafkaTestResourceProvider.java b/test-resources-kafka/src/main/java/io/micronaut/testresources/kafka/KafkaTestResourceProvider.java
index 02cccc8e6..8c22cc95f 100644
--- a/test-resources-kafka/src/main/java/io/micronaut/testresources/kafka/KafkaTestResourceProvider.java
+++ b/test-resources-kafka/src/main/java/io/micronaut/testresources/kafka/KafkaTestResourceProvider.java
@@ -19,11 +19,9 @@
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.utility.DockerImageName;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
+import java.util.*;
+
+import static io.micronaut.testresources.kafka.KafkaConfigurationSupport.isKraftMode;
/**
* A test resource provider which will spawn a Kafka test container.
@@ -32,7 +30,16 @@ public class KafkaTestResourceProvider extends AbstractTestContainersProviderConfluent Kafka Images
+ */
+ public static final String DEFAULT_KRAFT_IMAGE = "confluentinc/confluent-local:7.6.0";
+
public static final String DISPLAY_NAME = "Kafka";
+ public static final String SIMPLE_NAME = "kafka";
+ public static final List SUPPORTED_PROPERTIES_LIST = List.of(KAFKA_BOOTSTRAP_SERVERS);
@Override
public List getResolvableProperties(Map> propertyEntries, Map testResourcesConfig) {
@@ -46,7 +53,7 @@ public String getDisplayName() {
@Override
protected String getSimpleName() {
- return "kafka";
+ return SIMPLE_NAME;
}
@Override
@@ -56,7 +63,10 @@ protected String getDefaultImageName() {
@Override
protected KafkaContainer createContainer(DockerImageName imageName, Map requestedProperties, Map testResourcesConfig) {
- return new KafkaContainer(imageName);
+ boolean isCustomImage = !imageName.toString().equals(getDefaultImageName());
+ return isKraftMode(testResourcesConfig) ?
+ new KafkaContainer(isCustomImage ? imageName : DockerImageName.parse(DEFAULT_KRAFT_IMAGE).asCompatibleSubstituteFor("confluentinc/cp-kafka")).withKraft() :
+ new KafkaContainer(imageName);
}
@Override
diff --git a/test-resources-kafka/src/test/groovy/io/micronaut/testresources/kafka/CustomKafkaImageKraftTest.groovy b/test-resources-kafka/src/test/groovy/io/micronaut/testresources/kafka/CustomKafkaImageKraftTest.groovy
new file mode 100644
index 000000000..ada8d804b
--- /dev/null
+++ b/test-resources-kafka/src/test/groovy/io/micronaut/testresources/kafka/CustomKafkaImageKraftTest.groovy
@@ -0,0 +1,53 @@
+package io.micronaut.testresources.kafka
+
+import io.micronaut.context.ApplicationContext
+import io.micronaut.test.extensions.spock.annotation.MicronautTest
+import io.micronaut.testresources.core.Scope
+import io.micronaut.testresources.testcontainers.TestContainers
+import jakarta.inject.Inject
+
+@MicronautTest(environments = "kraft")
+class CustomKafkaImageKraftTest extends AbstractKafkaSpec {
+
+ @Override
+ Map getProperties() {
+ super.properties + [
+ "test-resources.containers.kafka.image-name": "confluentinc/confluent-local:7.7.1"
+ ]
+ }
+
+ @Override
+ String getImageName() {
+ 'confluent-local'
+ }
+
+ @Inject
+ ApplicationContext applicationContext
+
+ /**
+ * Note this test will fail if not launched in kraft mode (i.e. environments = kraft) as no zookeeper was configured.
+ * So there are no assertions in the test to check if kraft mode was enabled.
+ *
+ * Example Error:
+ *
+ * 10:22:21.672 [main] ERROR t.confluentinc/confluent-local:7.7.1 - Could not start container java.lang.IllegalStateException: Wait strategy failed. Container exited with code 1
+ *
+ * Docker Logs:
+ * 2024-10-11 10:21:22 [2024-10-11 14:21:22,411] ERROR Exiting Kafka due to fatal exception (kafka.Kafka$)
+ * 2024-10-11 10:21:22 java.lang.IllegalArgumentException: requirement failed: controller.listener.names must contain at least one value appearing in the 'listeners' configuration when running the KRaft controller role
+ */
+ def "starts Kafka using a custom image with Kraft"() {
+ when:
+ def client = applicationContext.getBean(AnalyticsClient)
+ def result = client.updateAnalytics("oh yeah!")
+
+ then:
+ listContainers().size() == 1
+ result.block() == "oh yeah!"
+ with(TestContainers.listByScope("kafka").get(Scope.of("kafka"))) {
+ size() == 1
+ get(0).dockerImageName == "confluentinc/confluent-local:7.7.1"
+ }
+ }
+
+}
diff --git a/test-resources-kafka/src/test/groovy/io/micronaut/testresources/kafka/KafkaKraftStartedTest.groovy b/test-resources-kafka/src/test/groovy/io/micronaut/testresources/kafka/KafkaKraftStartedTest.groovy
new file mode 100644
index 000000000..a72b0681e
--- /dev/null
+++ b/test-resources-kafka/src/test/groovy/io/micronaut/testresources/kafka/KafkaKraftStartedTest.groovy
@@ -0,0 +1,33 @@
+package io.micronaut.testresources.kafka
+
+import io.micronaut.context.ApplicationContext
+import io.micronaut.test.extensions.spock.annotation.MicronautTest
+import io.micronaut.testresources.core.Scope
+import io.micronaut.testresources.testcontainers.TestContainers
+import jakarta.inject.Inject
+
+@MicronautTest(environments = "kraft")
+class KafkaKraftStartedTest extends AbstractKafkaSpec {
+
+ @Inject
+ ApplicationContext applicationContext
+
+ @Override
+ String getImageName() {
+ 'confluent-local'
+ }
+
+ def "starts Kafka using a kraft mode and confluent-local image"() {
+ when:
+ def client = applicationContext.getBean(AnalyticsClient)
+ def result = client.updateAnalytics("oh yeah!")
+
+ then:
+ listContainers().size() == 1
+ result.block() == "oh yeah!"
+ with(TestContainers.listByScope("kafka").get(Scope.of("kafka"))) {
+ size() == 1
+ get(0).dockerImageName == KafkaTestResourceProvider.DEFAULT_KRAFT_IMAGE
+ }
+ }
+}
diff --git a/test-resources-kafka/src/test/groovy/io/micronaut/testresources/kafka/KafkaStartedTest.groovy b/test-resources-kafka/src/test/groovy/io/micronaut/testresources/kafka/KafkaStartedTest.groovy
index 6516c7e01..9acdf9eba 100644
--- a/test-resources-kafka/src/test/groovy/io/micronaut/testresources/kafka/KafkaStartedTest.groovy
+++ b/test-resources-kafka/src/test/groovy/io/micronaut/testresources/kafka/KafkaStartedTest.groovy
@@ -2,6 +2,8 @@ package io.micronaut.testresources.kafka
import io.micronaut.context.ApplicationContext
import io.micronaut.test.extensions.spock.annotation.MicronautTest
+import io.micronaut.testresources.core.Scope
+import io.micronaut.testresources.testcontainers.TestContainers
import jakarta.inject.Inject
@MicronautTest
@@ -18,6 +20,10 @@ class KafkaStartedTest extends AbstractKafkaSpec {
then:
listContainers().size() == 1
result.block() == "oh yeah!"
+ with(TestContainers.listByScope("kafka").get(Scope.of("kafka"))) {
+ size() == 1
+ get(0).dockerImageName == KafkaTestResourceProvider.DEFAULT_IMAGE
+ }
}
}
diff --git a/test-resources-kafka/src/test/resources/application-kraft.yml b/test-resources-kafka/src/test/resources/application-kraft.yml
new file mode 100644
index 000000000..4b71747b1
--- /dev/null
+++ b/test-resources-kafka/src/test/resources/application-kraft.yml
@@ -0,0 +1,4 @@
+test-resources:
+ containers:
+ kafka:
+ kraft: true