Skip to content

Commit

Permalink
chore: use concluent-local Docker image
Browse files Browse the repository at this point in the history
  • Loading branch information
mdelapenya committed Sep 15, 2023
1 parent 74f10dc commit d10d28e
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 14 deletions.
4 changes: 2 additions & 2 deletions docs/modules/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@ When starting the Kafka container, you can pass options in a variadic way to con
#### Image

If you need to set a different Kafka Docker image, you can use `testcontainers.WithImage` with a valid Docker image
for Kafka. E.g. `testcontainers.WithImage("confluentinc/cp-kafka:7.3.3")`.
for Kafka. E.g. `testcontainers.WithImage("confluentinc/confluent-local:7.5.0")`.

!!! warning
The minimal required version of Kafka for KRaft mode is `confluentinc/cp-kafka:7.0.0`. If you are using an image that
The minimal required version of Kafka for KRaft mode is `confluentinc/confluent-local:7.4.0`. If you are using an image that
is different from the official one, please make sure that it's compatible with KRaft mode, as the module won't check
the version for you.

Expand Down
2 changes: 1 addition & 1 deletion modules/kafka/examples_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ func ExampleRunContainer() {

kafkaContainer, err := kafka.RunContainer(ctx,
kafka.WithClusterID("test-cluster"),
testcontainers.WithImage("confluentinc/cp-kafka:7.3.3"),
testcontainers.WithImage("confluentinc/confluent-local:7.5.0"),
)
if err != nil {
panic(err)
Expand Down
9 changes: 5 additions & 4 deletions modules/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,12 @@ type KafkaContainer struct {
// RunContainer creates an instance of the Kafka container type
func RunContainer(ctx context.Context, opts ...testcontainers.ContainerCustomizer) (*KafkaContainer, error) {
req := testcontainers.ContainerRequest{
Image: "confluentinc/cp-kafka:7.3.3",
Image: "confluentinc/confluent-local:7.5.0",
ExposedPorts: []string{string(publicPort)},
Env: map[string]string{
// envVars {
"KAFKA_LISTENERS": "PLAINTEXT://0.0.0.0:9093,BROKER://0.0.0.0:9092,CONTROLLER://0.0.0.0:9094",
"KAFKA_REST_BOOTSTRAP_SERVERS": "PLAINTEXT://0.0.0.0:9093,BROKER://0.0.0.0:9092,CONTROLLER://0.0.0.0:9094",
"KAFKA_LISTENER_SECURITY_PROTOCOL_MAP": "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT",
"KAFKA_INTER_BROKER_LISTENER_NAME": "BROKER",
"KAFKA_BROKER_ID": "1",
Expand Down Expand Up @@ -169,7 +170,7 @@ func validateKRaftVersion(fqName string) error {
image := fqName[:strings.LastIndex(fqName, ":")]
version := fqName[strings.LastIndex(fqName, ":")+1:]

if !strings.EqualFold(image, "confluentinc/cp-kafka") {
if !strings.EqualFold(image, "confluentinc/confluent-local") {
// do not validate if the image is not the official one.
// not raising an error here, letting the image to start and
// eventually evaluate an error if it exists.
Expand All @@ -181,8 +182,8 @@ func validateKRaftVersion(fqName string) error {
version = fmt.Sprintf("v%s", version)
}

if semver.Compare(version, "v7.0.0") < 0 { // version < v7.0.0
return fmt.Errorf("version=%s. KRaft mode is only available since version 7.0.0", version)
if semver.Compare(version, "v7.4.0") < 0 { // version < v7.4.0
return fmt.Errorf("version=%s. KRaft mode is only available since version 7.4.0", version)
}

return nil
Expand Down
14 changes: 7 additions & 7 deletions modules/kafka/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ func TestKafka(t *testing.T) {

ctx := context.Background()

kafkaContainer, err := RunContainer(ctx, WithClusterID("kraftCluster"), testcontainers.WithImage("confluentinc/cp-kafka:7.3.3"))
kafkaContainer, err := RunContainer(ctx, WithClusterID("kraftCluster"), testcontainers.WithImage("confluentinc/confluent-local:7.5.0"))
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -87,7 +87,7 @@ func TestKafka(t *testing.T) {
func TestKafka_invalidVersion(t *testing.T) {
ctx := context.Background()

_, err := RunContainer(ctx, WithClusterID("kraftCluster"), testcontainers.WithImage("confluentinc/cp-kafka:6.3.3"))
_, err := RunContainer(ctx, WithClusterID("kraftCluster"), testcontainers.WithImage("confluentinc/confluent-local:6.3.3"))
if err == nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -149,30 +149,30 @@ func TestConfigureQuorumVoters(t *testing.T) {
}
}

func TestVAlidateKRaftVersion(t *testing.T) {
func TestValidateKRaftVersion(t *testing.T) {
tests := []struct {
name string
image string
wantErr bool
}{
{
name: "Official: valid version",
image: "confluentinc/cp-kafka:7.3.3",
image: "confluentinc/confluent-local:7.5.0",
wantErr: false,
},
{
name: "Official: valid, limit version",
image: "confluentinc/cp-kafka:7.0.0",
image: "confluentinc/confluent-local:7.4.0",
wantErr: false,
},
{
name: "Official: invalid, low version",
image: "confluentinc/cp-kafka:6.99.99",
image: "confluentinc/confluent-local:7.3.99",
wantErr: true,
},
{
name: "Official: invalid, too low version",
image: "confluentinc/cp-kafka:5.0.0",
image: "confluentinc/confluent-local:5.0.0",
wantErr: true,
},
{
Expand Down

0 comments on commit d10d28e

Please sign in to comment.