diff --git a/pkg/cmd/roachtest/tests/cdc.go b/pkg/cmd/roachtest/tests/cdc.go index aa7623dffe9b..6d9b90dc679a 100644 --- a/pkg/cmd/roachtest/tests/cdc.go +++ b/pkg/cmd/roachtest/tests/cdc.go @@ -1346,24 +1346,51 @@ func (k kafkaManager) start(ctx context.Context, services ...string) { k.restart(ctx, services...) } -func (k kafkaManager) restart(ctx context.Context, services ...string) { - var startArgs string - if len(services) == 0 { - startArgs = "schema-registry" +var kafkaServices = map[string][]string{ + "zookeeper": {"zookeeper"}, + "kafka": {"zookeeper", "kafka"}, + "schema-registry": {"zookeeper", "kafka", "schema-registry"}, +} + +func (k kafkaManager) kafkaServicesForTargets(targets []string) []string { + var services []string + for _, tgt := range targets { + if s, ok := kafkaServices[tgt]; ok { + services = append(services, s...) + } else { + k.t.Fatalf("unknown kafka start target %q", tgt) + } + } + return services +} + +func (k kafkaManager) restart(ctx context.Context, targetServices ...string) { + var services []string + if len(targetServices) == 0 { + services = kafkaServices["schema-registry"] } else { - startArgs = strings.Join(services, " ") + services = k.kafkaServicesForTargets(targetServices) } k.c.Run(ctx, k.nodes, "touch", k.serverJAASConfig()) + for _, svcName := range services { + // The confluent tool applies the KAFKA_OPTS to all + // services. Also, the kafka.logs.dir is used by each + // service, despite the name. + opts := fmt.Sprintf("-Djava.security.auth.login.config=%s -Dkafka.logs.dir=%s", + k.serverJAASConfig(), + fmt.Sprintf("logs/%s", svcName), + ) + startCmd := fmt.Sprintf( + "CONFLUENT_CURRENT=%s CONFLUENT_HOME=%s KAFKA_OPTS='%s' %s local services %s start", + k.basePath(), + k.confluentHome(), + opts, + k.confluentBin(), + svcName) + k.c.Run(ctx, k.nodes, startCmd) + } - startCmd := fmt.Sprintf( - "CONFLUENT_CURRENT=%s CONFLUENT_HOME=%s KAFKA_OPTS=-Djava.security.auth.login.config=%s %s local services %s start", - k.basePath(), - k.confluentHome(), - k.serverJAASConfig(), - k.confluentBin(), - startArgs) - k.c.Run(ctx, k.nodes, startCmd) } func (k kafkaManager) makeCommand(exe string, args ...string) string {