diff --git a/docs/modules/kafka.md b/docs/modules/kafka.md index efebbfa018..8971a2725f 100644 --- a/docs/modules/kafka.md +++ b/docs/modules/kafka.md @@ -38,10 +38,10 @@ When starting the Kafka container, you can pass options in a variadic way to con #### Image If you need to set a different Kafka Docker image, you can use `testcontainers.WithImage` with a valid Docker image -for Kafka. E.g. `testcontainers.WithImage("confluentinc/cp-kafka:7.3.3")`. +for Kafka. E.g. `testcontainers.WithImage("confluentinc/confluent-local:7.5.0")`. !!! warning - The minimal required version of Kafka for KRaft mode is `confluentinc/cp-kafka:7.0.0`. If you are using an image that + The minimal required version of Kafka for KRaft mode is `confluentinc/confluent-local:7.4.0`. If you are using an image that is different from the official one, please make sure that it's compatible with KRaft mode, as the module won't check the version for you. diff --git a/modules/kafka/examples_test.go b/modules/kafka/examples_test.go index c5a23b289a..61fe81f504 100644 --- a/modules/kafka/examples_test.go +++ b/modules/kafka/examples_test.go @@ -14,7 +14,7 @@ func ExampleRunContainer() { kafkaContainer, err := kafka.RunContainer(ctx, kafka.WithClusterID("test-cluster"), - testcontainers.WithImage("confluentinc/cp-kafka:7.3.3"), + testcontainers.WithImage("confluentinc/confluent-local:7.5.0"), ) if err != nil { panic(err) diff --git a/modules/kafka/kafka.go b/modules/kafka/kafka.go index 45fd2a5fe5..016509229b 100644 --- a/modules/kafka/kafka.go +++ b/modules/kafka/kafka.go @@ -39,11 +39,12 @@ type KafkaContainer struct { // RunContainer creates an instance of the Kafka container type func RunContainer(ctx context.Context, opts ...testcontainers.ContainerCustomizer) (*KafkaContainer, error) { req := testcontainers.ContainerRequest{ - Image: "confluentinc/cp-kafka:7.3.3", + Image: "confluentinc/confluent-local:7.5.0", ExposedPorts: []string{string(publicPort)}, Env: map[string]string{ // envVars { "KAFKA_LISTENERS": "PLAINTEXT://0.0.0.0:9093,BROKER://0.0.0.0:9092,CONTROLLER://0.0.0.0:9094", + "KAFKA_REST_BOOTSTRAP_SERVERS": "PLAINTEXT://0.0.0.0:9093,BROKER://0.0.0.0:9092,CONTROLLER://0.0.0.0:9094", "KAFKA_LISTENER_SECURITY_PROTOCOL_MAP": "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT", "KAFKA_INTER_BROKER_LISTENER_NAME": "BROKER", "KAFKA_BROKER_ID": "1", @@ -169,7 +170,7 @@ func validateKRaftVersion(fqName string) error { image := fqName[:strings.LastIndex(fqName, ":")] version := fqName[strings.LastIndex(fqName, ":")+1:] - if !strings.EqualFold(image, "confluentinc/cp-kafka") { + if !strings.EqualFold(image, "confluentinc/confluent-local") { // do not validate if the image is not the official one. // not raising an error here, letting the image to start and // eventually evaluate an error if it exists. @@ -181,8 +182,8 @@ func validateKRaftVersion(fqName string) error { version = fmt.Sprintf("v%s", version) } - if semver.Compare(version, "v7.0.0") < 0 { // version < v7.0.0 - return fmt.Errorf("version=%s. KRaft mode is only available since version 7.0.0", version) + if semver.Compare(version, "v7.4.0") < 0 { // version < v7.4.0 + return fmt.Errorf("version=%s. KRaft mode is only available since version 7.4.0", version) } return nil diff --git a/modules/kafka/kafka_test.go b/modules/kafka/kafka_test.go index 4edc9095e7..821ca66fef 100644 --- a/modules/kafka/kafka_test.go +++ b/modules/kafka/kafka_test.go @@ -15,7 +15,7 @@ func TestKafka(t *testing.T) { ctx := context.Background() - kafkaContainer, err := RunContainer(ctx, WithClusterID("kraftCluster"), testcontainers.WithImage("confluentinc/cp-kafka:7.3.3")) + kafkaContainer, err := RunContainer(ctx, WithClusterID("kraftCluster"), testcontainers.WithImage("confluentinc/confluent-local:7.5.0")) if err != nil { t.Fatal(err) } @@ -87,7 +87,7 @@ func TestKafka(t *testing.T) { func TestKafka_invalidVersion(t *testing.T) { ctx := context.Background() - _, err := RunContainer(ctx, WithClusterID("kraftCluster"), testcontainers.WithImage("confluentinc/cp-kafka:6.3.3")) + _, err := RunContainer(ctx, WithClusterID("kraftCluster"), testcontainers.WithImage("confluentinc/confluent-local:6.3.3")) if err == nil { t.Fatal(err) } @@ -149,7 +149,7 @@ func TestConfigureQuorumVoters(t *testing.T) { } } -func TestVAlidateKRaftVersion(t *testing.T) { +func TestValidateKRaftVersion(t *testing.T) { tests := []struct { name string image string @@ -157,22 +157,22 @@ func TestVAlidateKRaftVersion(t *testing.T) { }{ { name: "Official: valid version", - image: "confluentinc/cp-kafka:7.3.3", + image: "confluentinc/confluent-local:7.5.0", wantErr: false, }, { name: "Official: valid, limit version", - image: "confluentinc/cp-kafka:7.0.0", + image: "confluentinc/confluent-local:7.4.0", wantErr: false, }, { name: "Official: invalid, low version", - image: "confluentinc/cp-kafka:6.99.99", + image: "confluentinc/confluent-local:7.3.99", wantErr: true, }, { name: "Official: invalid, too low version", - image: "confluentinc/cp-kafka:5.0.0", + image: "confluentinc/confluent-local:5.0.0", wantErr: true, }, {