Skip to content

Commit

Permalink
Kafka Kraft Mode support via confluent-local image (#623)
Browse files Browse the repository at this point in the history
* Kafka Kraft Mode support via confluent-local image

Fixes: #622

* Allow custom image specification with kraft mode
  • Loading branch information
scprek authored Oct 11, 2024
1 parent c7a4f90 commit 99e99f1
Show file tree
Hide file tree
Showing 7 changed files with 166 additions and 7 deletions.
13 changes: 13 additions & 0 deletions src/main/docs/guide/modules-kafka.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,16 @@ Kafka support will automatically start a https://kafka.apache.org[Kafka containe
The default image can be overwritten by setting the `test-resources.containers.kafka.image-name` property.

TIP: See the guide for https://guides.micronaut.io/latest/testing-micronaut-kafka-listener-using-testcontainers.html[Testing Kafka Listener using Testcontainers with the Micronaut Framework] to learn more.


https://docs.confluent.io/platform/current/kafka-metadata/kraft.html#kraft-overview[Kraft Mode] is supported via the `test-resources.containers.kafka.kraft` property.

[configuration]
----
test-resources:
containers:
kafka:
kraft: true
----

NOTE: This switches to the https://docs.confluent.io/platform/current/installation/docker/image-reference.html#ak-images[confluent-local] Docker image with https://java.testcontainers.org/modules/kafka/#using-kraft-mode[TestContainer Kraft Support]
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright 2017-2024 original authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.micronaut.testresources.kafka;

import java.util.Map;

/**
* Internal class to deal with the test resources
* kafka configuration block.
*/
abstract class KafkaConfigurationSupport {
public static final String CONFIG_KAFKA_KRAFT_MODE = "containers.kafka.kraft";

private KafkaConfigurationSupport() {

}

/**
* Start Kafka container in Kraft mode with the confluent-local image.
* See: <a href="https://docs.confluent.io/platform/current/installation/docker/image-reference.html#ak-images">Confluent Kafka Images</a>
* See: <a href="https://java.testcontainers.org/modules/kafka/#using-kraft-mode">TestContainers Kafka Kraft Mode</a>
*/
static boolean isKraftMode(Map<String, Object> testResourcesConfig) {
Boolean clusterMode = (Boolean) testResourcesConfig.getOrDefault(CONFIG_KAFKA_KRAFT_MODE, false);
return Boolean.TRUE.equals(clusterMode);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,9 @@
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.utility.DockerImageName;

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.*;

import static io.micronaut.testresources.kafka.KafkaConfigurationSupport.isKraftMode;

/**
* A test resource provider which will spawn a Kafka test container.
Expand All @@ -32,7 +30,16 @@ public class KafkaTestResourceProvider extends AbstractTestContainersProvider<Ka

public static final String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers";
public static final String DEFAULT_IMAGE = "confluentinc/cp-kafka:7.0.4";
/**
* Leverage confluent-local image as it is optimized for local development and the image enables
* KRaft mode with no configuration setup.
* See: <a href="https://docs.confluent.io/platform/current/installation/docker/image-reference.html#ak-images">Confluent Kafka Images</a>
*/
public static final String DEFAULT_KRAFT_IMAGE = "confluentinc/confluent-local:7.6.0";

public static final String DISPLAY_NAME = "Kafka";
public static final String SIMPLE_NAME = "kafka";
public static final List<String> SUPPORTED_PROPERTIES_LIST = List.of(KAFKA_BOOTSTRAP_SERVERS);

@Override
public List<String> getResolvableProperties(Map<String, Collection<String>> propertyEntries, Map<String, Object> testResourcesConfig) {
Expand All @@ -46,7 +53,7 @@ public String getDisplayName() {

@Override
protected String getSimpleName() {
return "kafka";
return SIMPLE_NAME;
}

@Override
Expand All @@ -56,7 +63,10 @@ protected String getDefaultImageName() {

@Override
protected KafkaContainer createContainer(DockerImageName imageName, Map<String, Object> requestedProperties, Map<String, Object> testResourcesConfig) {
return new KafkaContainer(imageName);
boolean isCustomImage = !imageName.toString().equals(getDefaultImageName());
return isKraftMode(testResourcesConfig) ?
new KafkaContainer(isCustomImage ? imageName : DockerImageName.parse(DEFAULT_KRAFT_IMAGE).asCompatibleSubstituteFor("confluentinc/cp-kafka")).withKraft() :
new KafkaContainer(imageName);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package io.micronaut.testresources.kafka

import io.micronaut.context.ApplicationContext
import io.micronaut.test.extensions.spock.annotation.MicronautTest
import io.micronaut.testresources.core.Scope
import io.micronaut.testresources.testcontainers.TestContainers
import jakarta.inject.Inject

@MicronautTest(environments = "kraft")
class CustomKafkaImageKraftTest extends AbstractKafkaSpec {

@Override
Map<String, String> getProperties() {
super.properties + [
"test-resources.containers.kafka.image-name": "confluentinc/confluent-local:7.7.1"
]
}

@Override
String getImageName() {
'confluent-local'
}

@Inject
ApplicationContext applicationContext

/**
* Note this test will fail if not launched in kraft mode (i.e. environments = kraft) as no zookeeper was configured.
* So there are no assertions in the test to check if kraft mode was enabled.
*
* Example Error:
*
* 10:22:21.672 [main] ERROR t.confluentinc/confluent-local:7.7.1 - Could not start container java.lang.IllegalStateException: Wait strategy failed. Container exited with code 1
*
* Docker Logs:
* 2024-10-11 10:21:22 [2024-10-11 14:21:22,411] ERROR Exiting Kafka due to fatal exception (kafka.Kafka$)
* 2024-10-11 10:21:22 java.lang.IllegalArgumentException: requirement failed: controller.listener.names must contain at least one value appearing in the 'listeners' configuration when running the KRaft controller role
*/
def "starts Kafka using a custom image with Kraft"() {
when:
def client = applicationContext.getBean(AnalyticsClient)
def result = client.updateAnalytics("oh yeah!")

then:
listContainers().size() == 1
result.block() == "oh yeah!"
with(TestContainers.listByScope("kafka").get(Scope.of("kafka"))) {
size() == 1
get(0).dockerImageName == "confluentinc/confluent-local:7.7.1"
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package io.micronaut.testresources.kafka

import io.micronaut.context.ApplicationContext
import io.micronaut.test.extensions.spock.annotation.MicronautTest
import io.micronaut.testresources.core.Scope
import io.micronaut.testresources.testcontainers.TestContainers
import jakarta.inject.Inject

@MicronautTest(environments = "kraft")
class KafkaKraftStartedTest extends AbstractKafkaSpec {

@Inject
ApplicationContext applicationContext

@Override
String getImageName() {
'confluent-local'
}

def "starts Kafka using a kraft mode and confluent-local image"() {
when:
def client = applicationContext.getBean(AnalyticsClient)
def result = client.updateAnalytics("oh yeah!")

then:
listContainers().size() == 1
result.block() == "oh yeah!"
with(TestContainers.listByScope("kafka").get(Scope.of("kafka"))) {
size() == 1
get(0).dockerImageName == KafkaTestResourceProvider.DEFAULT_KRAFT_IMAGE
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package io.micronaut.testresources.kafka

import io.micronaut.context.ApplicationContext
import io.micronaut.test.extensions.spock.annotation.MicronautTest
import io.micronaut.testresources.core.Scope
import io.micronaut.testresources.testcontainers.TestContainers
import jakarta.inject.Inject

@MicronautTest
Expand All @@ -18,6 +20,10 @@ class KafkaStartedTest extends AbstractKafkaSpec {
then:
listContainers().size() == 1
result.block() == "oh yeah!"
with(TestContainers.listByScope("kafka").get(Scope.of("kafka"))) {
size() == 1
get(0).dockerImageName == KafkaTestResourceProvider.DEFAULT_IMAGE
}
}

}
4 changes: 4 additions & 0 deletions test-resources-kafka/src/test/resources/application-kraft.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
test-resources:
containers:
kafka:
kraft: true

0 comments on commit 99e99f1

Please sign in to comment.