diff --git a/pkg/cmd/roachtest/tests/cdc.go b/pkg/cmd/roachtest/tests/cdc.go index d1cda656ebbf..f1457ed97a92 100644 --- a/pkg/cmd/roachtest/tests/cdc.go +++ b/pkg/cmd/roachtest/tests/cdc.go @@ -59,6 +59,12 @@ const ( ledgerWorkloadType workloadType = "ledger" ) +// kafkaCreateTopicRetryDuration is the retry duration we use while +// trying to create a Kafka topic after setting it up on a +// node. Without retrying, a `kafka controller not available` error is +// seen with a 1-5% probability +var kafkaCreateTopicRetryDuration = 1 * time.Minute + type sinkType int32 const ( @@ -1537,14 +1543,16 @@ func (k kafkaManager) schemaRegistryURL(ctx context.Context) string { func (k kafkaManager) createTopic(ctx context.Context, topic string) error { kafkaAddrs := []string{k.consumerURL(ctx)} config := sarama.NewConfig() - admin, err := sarama.NewClusterAdmin(kafkaAddrs, config) - if err != nil { - return errors.Wrap(err, "admin client") - } - return admin.CreateTopic(topic, &sarama.TopicDetail{ - NumPartitions: 1, - ReplicationFactor: 1, - }, false) + return retry.ForDuration(kafkaCreateTopicRetryDuration, func() error { + admin, err := sarama.NewClusterAdmin(kafkaAddrs, config) + if err != nil { + return errors.Wrap(err, "admin client") + } + return admin.CreateTopic(topic, &sarama.TopicDetail{ + NumPartitions: 1, + ReplicationFactor: 1, + }, false) + }) } func (k kafkaManager) consumer(ctx context.Context, topic string) (*topicConsumer, error) { diff --git a/pkg/cmd/roachtest/tests/mixed_version_cdc.go b/pkg/cmd/roachtest/tests/mixed_version_cdc.go index de9283c9eb36..0731fe2b6424 100644 --- a/pkg/cmd/roachtest/tests/mixed_version_cdc.go +++ b/pkg/cmd/roachtest/tests/mixed_version_cdc.go @@ -23,7 +23,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test" "github.com/cockroachdb/cockroach/pkg/util/randutil" - "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/version" @@ -103,14 +102,8 @@ func newCDCMixedVersionTester( func (cmvt *cdcMixedVersionTester) StartKafka(t test.Test, c cluster.Cluster) { t.Status("starting Kafka node") cmvt.kafka, cmvt.cleanup = setupKafka(cmvt.ctx, t, c, cmvt.kafkaNodes) - - // try to create a Kafka topic for some time. Without waiting, a - // `kafka controller not available` error is seen with 1-5% - // probability - if err := retry.ForDuration(1*time.Minute, func() error { - return cmvt.kafka.createTopic(cmvt.ctx, targetTable) - }); err != nil { - t.Fatal(fmt.Errorf("timed out trying to create kafka topic: %w", err)) + if err := cmvt.kafka.createTopic(cmvt.ctx, targetTable); err != nil { + t.Fatal(err) } }