diff --git a/pkg/cmd/roachtest/tests/cdc.go b/pkg/cmd/roachtest/tests/cdc.go index 40a2db252886..01f1b69aa07a 100644 --- a/pkg/cmd/roachtest/tests/cdc.go +++ b/pkg/cmd/roachtest/tests/cdc.go @@ -241,6 +241,10 @@ func (ct *cdcTester) setupSink(args feedArgs) string { kafka.install(ct.ctx) kafka.start(ct.ctx, "kafka") + if args.kafkaQuota > 0 { + kafka.setProducerQuota(ct.ctx, args.kafkaQuota) + } + if args.kafkaChaos { ct.mon.Go(func(ctx context.Context) error { period, downTime := 2*time.Minute, 20*time.Second @@ -505,6 +509,7 @@ type feedArgs struct { assumeRole string tolerateErrors bool sinkURIOverride string + kafkaQuota int cdcFeatureFlags } @@ -1242,6 +1247,34 @@ func registerCDC(r registry.Registry) { ct.waitForWorkload() }, }) + r.Add(registry.TestSpec{ + Name: "cdc/kafka-quota", + Owner: `cdc`, + Benchmark: true, + Cluster: r.MakeClusterSpec(4, spec.CPU(16)), + Leases: registry.MetamorphicLeases, + CompatibleClouds: registry.AllExceptAWS, + Suites: registry.Suites(registry.Nightly), + RequiresLicense: true, + Run: func(ctx context.Context, t test.Test, c cluster.Cluster) { + ct := newCDCTester(ctx, t, c) + defer ct.Close() + + ct.runTPCCWorkload(tpccArgs{warehouses: 100, duration: "10m"}) + + feed := ct.newChangefeed(feedArgs{ + sinkType: kafkaSink, + targets: allTpccTargets, + opts: map[string]string{"initial_scan": "'no'"}, + kafkaQuota: 1024, + }) + ct.runFeedLatencyVerifier(feed, latencyTargets{ + steadyLatency: 5 * time.Minute, + }) + ct.waitForWorkload() + t.Fatal("failed statement") + }, + }) r.Add(registry.TestSpec{ Name: "cdc/crdb-chaos", Owner: `cdc`, @@ -1995,6 +2028,40 @@ type kafkaManager struct { useKafka2 bool } +func (k kafkaManager) setProducerQuota(ctx context.Context, bytesPerSecond int) { + // bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' --entity-type users --entity-name user1 --entity-type clients --entity-name clientA + k.t.Status("setting producer quota to %d bytes per second for all users", bytesPerSecond) + //k.c.Run(ctx, k.nodes, filepath.Join(k.binDir(), "kafka-configs"), + // "--bootstrap-server", "localhost:9092", + // "--alter", + // "--add-config", fmt.Sprintf("producer_byte_rate=%d", bytesPerSecond), + // "--entity-type", "users", + // "--entity-name", "default") + // + + k.c.Run(ctx, option.WithNodes(k.kafkaSinkNode), filepath.Join(k.binDir(), "kafka-configs"), + // bootstrap-server=localhost:9092 + "--bootstrap-server", "localhost:9092", + "--alter", + "--add-config", fmt.Sprintf("producer_byte_rate=%d", bytesPerSecond), + "--entity-type", "users", + "--entity-default") + + //k.c.Run(ctx, k.nodes, filepath.Join(k.binDir(), "kafka-configs"), + // "--bootstrap-server", "localhost:9092", + // "--alter", + // "--add-config", "SCRAM-SHA-512=[password=scram512-secret]", + // "--entity-type", "users", + // "--entity-name", "scram512") + // + //k.c.Run(ctx, k.nodes, filepath.Join(k.binDir(), "kafka-configs"), + // "--bootstrap-server", "localhost:9092", + // "--alter", + // "--add-config", "SCRAM-SHA-256=[password=scram256-secret]", + // "--entity-type", "users", + // "--entity-name", "scram256") +} + func (k kafkaManager) basePath() string { if k.c.IsLocal() { return `/tmp/confluent`