Skip to content

Commit

Permalink
Merge #87025
Browse files Browse the repository at this point in the history
87025: roachtest: fix infrastructure flake on cdc/bank r=srosenberg a=renatolabs

When trying to create a Kafka topic immediately after setting it up on
a node, we sometimes see the `kafka controller not available`
error. Retrying the topic creation is sufficient to make progress on
the test.

This was supposed to be included in #86468, but was missed. Now the
retry logic lives in the `createTopic` function itself, so callers no
longer have to remember to wrap this function in a retry loop.

Fixes #86979.

Release justification: test-only change.
Release note: None.

Co-authored-by: Renato Costa <[email protected]>
  • Loading branch information
craig[bot] and renatolabs committed Aug 31, 2022
2 parents 30c7e9c + df70f0d commit 7d379be
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 17 deletions.
24 changes: 16 additions & 8 deletions pkg/cmd/roachtest/tests/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,12 @@ const (
ledgerWorkloadType workloadType = "ledger"
)

// kafkaCreateTopicRetryDuration is the retry duration we use while
// trying to create a Kafka topic after setting it up on a
// node. Without retrying, a `kafka controller not available` error is
// seen with a 1-5% probability
var kafkaCreateTopicRetryDuration = 1 * time.Minute

type sinkType int32

const (
Expand Down Expand Up @@ -1537,14 +1543,16 @@ func (k kafkaManager) schemaRegistryURL(ctx context.Context) string {
func (k kafkaManager) createTopic(ctx context.Context, topic string) error {
kafkaAddrs := []string{k.consumerURL(ctx)}
config := sarama.NewConfig()
admin, err := sarama.NewClusterAdmin(kafkaAddrs, config)
if err != nil {
return errors.Wrap(err, "admin client")
}
return admin.CreateTopic(topic, &sarama.TopicDetail{
NumPartitions: 1,
ReplicationFactor: 1,
}, false)
return retry.ForDuration(kafkaCreateTopicRetryDuration, func() error {
admin, err := sarama.NewClusterAdmin(kafkaAddrs, config)
if err != nil {
return errors.Wrap(err, "admin client")
}
return admin.CreateTopic(topic, &sarama.TopicDetail{
NumPartitions: 1,
ReplicationFactor: 1,
}, false)
})
}

func (k kafkaManager) consumer(ctx context.Context, topic string) (*topicConsumer, error) {
Expand Down
11 changes: 2 additions & 9 deletions pkg/cmd/roachtest/tests/mixed_version_cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/version"
Expand Down Expand Up @@ -103,14 +102,8 @@ func newCDCMixedVersionTester(
func (cmvt *cdcMixedVersionTester) StartKafka(t test.Test, c cluster.Cluster) {
t.Status("starting Kafka node")
cmvt.kafka, cmvt.cleanup = setupKafka(cmvt.ctx, t, c, cmvt.kafkaNodes)

// try to create a Kafka topic for some time. Without waiting, a
// `kafka controller not available` error is seen with 1-5%
// probability
if err := retry.ForDuration(1*time.Minute, func() error {
return cmvt.kafka.createTopic(cmvt.ctx, targetTable)
}); err != nil {
t.Fatal(fmt.Errorf("timed out trying to create kafka topic: %w", err))
if err := cmvt.kafka.createTopic(cmvt.ctx, targetTable); err != nil {
t.Fatal(err)
}
}

Expand Down

0 comments on commit 7d379be

Please sign in to comment.