From 8aab976dc2df313259cb58b43c8d5082d876df5e Mon Sep 17 00:00:00 2001 From: Steven Danna Date: Thu, 26 Aug 2021 17:39:15 +0100 Subject: [PATCH] roachtest: move kafka logs into collected log dir Fixes #69155 Release justification: Non-production code change Release note: None --- pkg/cmd/roachtest/tests/cdc.go | 53 +++++++++++++++++++++++++--------- 1 file changed, 40 insertions(+), 13 deletions(-) diff --git a/pkg/cmd/roachtest/tests/cdc.go b/pkg/cmd/roachtest/tests/cdc.go index aa7623dffe9b..6d9b90dc679a 100644 --- a/pkg/cmd/roachtest/tests/cdc.go +++ b/pkg/cmd/roachtest/tests/cdc.go @@ -1346,24 +1346,51 @@ func (k kafkaManager) start(ctx context.Context, services ...string) { k.restart(ctx, services...) } -func (k kafkaManager) restart(ctx context.Context, services ...string) { - var startArgs string - if len(services) == 0 { - startArgs = "schema-registry" +var kafkaServices = map[string][]string{ + "zookeeper": {"zookeeper"}, + "kafka": {"zookeeper", "kafka"}, + "schema-registry": {"zookeeper", "kafka", "schema-registry"}, +} + +func (k kafkaManager) kafkaServicesForTargets(targets []string) []string { + var services []string + for _, tgt := range targets { + if s, ok := kafkaServices[tgt]; ok { + services = append(services, s...) + } else { + k.t.Fatalf("unknown kafka start target %q", tgt) + } + } + return services +} + +func (k kafkaManager) restart(ctx context.Context, targetServices ...string) { + var services []string + if len(targetServices) == 0 { + services = kafkaServices["schema-registry"] } else { - startArgs = strings.Join(services, " ") + services = k.kafkaServicesForTargets(targetServices) } k.c.Run(ctx, k.nodes, "touch", k.serverJAASConfig()) + for _, svcName := range services { + // The confluent tool applies the KAFKA_OPTS to all + // services. Also, the kafka.logs.dir is used by each + // service, despite the name. + opts := fmt.Sprintf("-Djava.security.auth.login.config=%s -Dkafka.logs.dir=%s", + k.serverJAASConfig(), + fmt.Sprintf("logs/%s", svcName), + ) + startCmd := fmt.Sprintf( + "CONFLUENT_CURRENT=%s CONFLUENT_HOME=%s KAFKA_OPTS='%s' %s local services %s start", + k.basePath(), + k.confluentHome(), + opts, + k.confluentBin(), + svcName) + k.c.Run(ctx, k.nodes, startCmd) + } - startCmd := fmt.Sprintf( - "CONFLUENT_CURRENT=%s CONFLUENT_HOME=%s KAFKA_OPTS=-Djava.security.auth.login.config=%s %s local services %s start", - k.basePath(), - k.confluentHome(), - k.serverJAASConfig(), - k.confluentBin(), - startArgs) - k.c.Run(ctx, k.nodes, startCmd) } func (k kafkaManager) makeCommand(exe string, args ...string) string {