From 34287265d2279cf5ecf297ed86fef36ba28e7ae3 Mon Sep 17 00:00:00 2001 From: Andy Yang Date: Mon, 13 May 2024 17:18:22 -0400 Subject: [PATCH] roachtest: add order validation to CDC Kafka roachtests This patch adds order validation to CDC Kafka roachtests so that we can build more confidence in our ordering guarantees. It can be enabled for a roachtest either by directly setting the `validateOrder` flag on a `kafkaManager` before creating consumers, or indirectly by setting the `validateOrder` flag on `kafkaFeedArgs` for tests that use `cdcTester`. Release note: None --- pkg/cmd/roachtest/tests/cdc.go | 250 +++++++++++++++---- pkg/cmd/roachtest/tests/mixed_version_cdc.go | 16 +- 2 files changed, 204 insertions(+), 62 deletions(-) diff --git a/pkg/cmd/roachtest/tests/cdc.go b/pkg/cmd/roachtest/tests/cdc.go index ec33bf80f9bf..48d02ad0174f 100644 --- a/pkg/cmd/roachtest/tests/cdc.go +++ b/pkg/cmd/roachtest/tests/cdc.go @@ -59,6 +59,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/jobutils" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/timeutil" @@ -242,17 +243,15 @@ func (ct *cdcTester) setupSink(args feedArgs) string { case pubsubSink: sinkURI = changefeedccl.GcpScheme + `://cockroach-ephemeral` + "?AUTH=implicit&topic_name=pubsubSink-roachtest®ion=us-east1" case kafkaSink: - kafkaNode := ct.kafkaSinkNode() - kafka := kafkaManager{ - t: ct.t, - c: ct.cluster, - kafkaSinkNode: kafkaNode, - mon: ct.mon, + kafka, _ := setupKafka(ct.ctx, ct.t, ct.cluster, ct.kafkaSinkNode()) + kafka.mon = ct.mon + kafka.validateOrder = args.kafkaArgs.validateOrder + + if err := kafka.startTopicConsumers(ct.ctx, args.targets, ct.doneCh); err != nil { + ct.t.Fatal(err) } - kafka.install(ct.ctx) - kafka.start(ct.ctx, "kafka") - if args.kafkaChaos { + if args.kafkaArgs.kafkaChaos { ct.mon.Go(func(ctx context.Context) error { period, downTime := 2*time.Minute, 20*time.Second return kafka.chaosLoop(ctx, period, downTime, ct.doneCh) @@ -513,14 +512,23 @@ type feedArgs struct { sinkType sinkType targets []string opts map[string]string - kafkaChaos bool assumeRole string tolerateErrors bool sinkURIOverride string cdcFeatureFlags + kafkaArgs kafkaFeedArgs +} + +// kafkaFeedArgs are args that are specific to kafkaSink changefeeds. +type kafkaFeedArgs struct { + // If kafkaChaos is true, the Kafka cluster will periodically restart + // to simulate unreliability. + kafkaChaos bool + // If validateOrder is set to true, order validators will be created + // for each topic to validate the changefeed's ordering guarantees. + validateOrder bool } -// TODO: Maybe move away from feedArgs since its only 3 things func (ct *cdcTester) newChangefeed(args feedArgs) changefeedJob { ct.t.Status(fmt.Sprintf("initiating %s sink", args.sinkType)) var sinkURI string @@ -545,6 +553,9 @@ func (ct *cdcTester) newChangefeed(args feedArgs) changefeedJob { } else { feedOptions["resolved"] = "" } + if args.kafkaArgs.validateOrder { + feedOptions["updated"] = "" + } for option, value := range args.opts { feedOptions[option] = value @@ -781,11 +792,11 @@ func runCDCBank(ctx context.Context, t test.Test, c cluster.Cluster) { t.Fatal(err) } - tc, err := kafka.newConsumer(ctx, "bank") + tc, err := kafka.newConsumer(ctx, "bank", nil /* stopper */) if err != nil { t.Fatal(errors.Wrap(err, "could not create kafka consumer")) } - defer tc.Close() + defer tc.close() l, err := t.L().ChildLogger(`changefeed`) if err != nil { @@ -814,9 +825,9 @@ func runCDCBank(ctx context.Context, t test.Test, c cluster.Cluster) { defer func() { close(messageBuf) }() v := cdctest.NewCountValidator(cdctest.NoOpValidator) for { - m := tc.Next(ctx) - if m == nil { - return fmt.Errorf("unexpected end of changefeed") + m, err := tc.next(ctx) + if err != nil { + return err } messageBuf <- m updated, resolved, err := cdctest.ParseJSONValueTimestamps(m.Value) @@ -1363,7 +1374,10 @@ func registerCDC(r registry.Registry) { feed := ct.newChangefeed(feedArgs{ sinkType: kafkaSink, targets: allTpccTargets, - opts: map[string]string{"initial_scan": "'no'"}, + kafkaArgs: kafkaFeedArgs{ + validateOrder: true, + }, + opts: map[string]string{"initial_scan": "'no'"}, }) ct.runFeedLatencyVerifier(feed, latencyTargets{ initialScanLatency: 3 * time.Minute, @@ -1540,10 +1554,13 @@ func registerCDC(r registry.Registry) { ct.runTPCCWorkload(tpccArgs{warehouses: 100, duration: "30m"}) feed := ct.newChangefeed(feedArgs{ - sinkType: kafkaSink, - targets: allTpccTargets, - kafkaChaos: true, - opts: map[string]string{"initial_scan": "'no'"}, + sinkType: kafkaSink, + targets: allTpccTargets, + kafkaArgs: kafkaFeedArgs{ + kafkaChaos: true, + validateOrder: true, + }, + opts: map[string]string{"initial_scan": "'no'"}, }) ct.runFeedLatencyVerifier(feed, latencyTargets{ initialScanLatency: 3 * time.Minute, @@ -1570,8 +1587,11 @@ func registerCDC(r registry.Registry) { ct.runTPCCWorkload(tpccArgs{warehouses: 100, duration: "30m", tolerateErrors: true}) feed := ct.newChangefeed(feedArgs{ - sinkType: kafkaSink, - targets: allTpccTargets, + sinkType: kafkaSink, + targets: allTpccTargets, + kafkaArgs: kafkaFeedArgs{ + validateOrder: true, + }, opts: map[string]string{"initial_scan": "'no'"}, tolerateErrors: true, }) @@ -1607,7 +1627,13 @@ func registerCDC(r registry.Registry) { t.Fatalf("failed statement %q: %s", alterStmt, err.Error()) } - feed := ct.newChangefeed(feedArgs{sinkType: kafkaSink, targets: allLedgerTargets}) + feed := ct.newChangefeed(feedArgs{ + sinkType: kafkaSink, + targets: allLedgerTargets, + kafkaArgs: kafkaFeedArgs{ + validateOrder: true, + }, + }) ct.runFeedLatencyVerifier(feed, latencyTargets{ initialScanLatency: 10 * time.Minute, steadyLatency: time.Minute, @@ -2357,6 +2383,10 @@ type kafkaManager struct { // Our method of requiring OAuth on the broker only works with Kafka 2 useKafka2 bool + + // validateOrder specifies whether consumers created by the + // kafkaManager should create and use order validators. + validateOrder bool } func (k kafkaManager) basePath() string { @@ -2984,7 +3014,9 @@ func (k kafkaManager) createTopic(ctx context.Context, topic string) error { }) } -func (k kafkaManager) newConsumer(ctx context.Context, topic string) (*topicConsumer, error) { +func (k kafkaManager) newConsumer( + ctx context.Context, topic string, stopper <-chan struct{}, +) (*topicConsumer, error) { kafkaAddrs := []string{k.consumerURL(ctx)} config := sarama.NewConfig() // I was seeing "error processing FetchRequest: kafka: error decoding @@ -2998,14 +3030,66 @@ func (k kafkaManager) newConsumer(ctx context.Context, topic string) (*topicCons if err != nil { return nil, err } - tc, err := makeTopicConsumer(consumer, topic) + var validator cdctest.Validator + if k.validateOrder { + validator = cdctest.NewOrderValidator(topic) + } + tc, err := newTopicConsumer(k.t, consumer, topic, validator, stopper) if err != nil { _ = consumer.Close() return nil, err } + k.t.L().Printf("topic consumer for %s has partitions: %s", topic, tc.partitions) return tc, nil } +func (k kafkaManager) startTopicConsumers( + ctx context.Context, targets []string, stopper <-chan struct{}, +) error { + for _, target := range targets { + // We need to strip off the database and schema name since the topic name + // is just the table name. + topic := target + lastPeriodIndex := strings.LastIndex(target, ".") + if lastPeriodIndex != -1 { + topic = topic[lastPeriodIndex+1:] + } + + k.t.L().Printf("starting topic consumer for topic: %s", topic) + k.mon.Go(func(ctx context.Context) error { + topicConsumer, err := k.newConsumer(ctx, topic, stopper) + if err != nil { + return err + } + defer topicConsumer.close() + everyN := log.Every(30 * time.Second) + for { + select { + case <-stopper: + return nil + case <-ctx.Done(): + return ctx.Err() + default: + } + + // The topicConsumer has order validation built-in so this has + // the side effect of validating the order of incoming messages. + _, err := topicConsumer.next(ctx) + if err != nil { + k.t.L().Printf("topic consumer for %s encountered error: %s", topic, err) + return err + } + if everyN.ShouldLog() { + k.t.L().Printf("topic consumer for %s validated %d rows and %d resolved timestamps", + topic, topicConsumer.validator.NumRows, topicConsumer.validator.NumResolved) + } + } + }) + } + + return nil +} + type tpccWorkload struct { workloadNodes option.NodeListOption sqlNodes option.NodeListOption @@ -3231,7 +3315,7 @@ func getChangefeedInfo(db *gosql.DB, jobID int) (*changefeedInfo, error) { // nightly, but nice for development. func stopFeeds(db *gosql.DB) { _, _ = db.Exec(`CANCEL JOBS ( - SELECT job_id FROM [SHOW JOBS] WHERE status = 'running' + SELECT job_id FROM [SHOW CHANGEFEED JOBS] WHERE status = 'running' )`) } @@ -3261,54 +3345,118 @@ func setupKafka( } type topicConsumer struct { - sarama.Consumer - + t test.Test + consumer sarama.Consumer topic string partitions []string - partitionConsumers []sarama.PartitionConsumer -} - -func makeTopicConsumer(c sarama.Consumer, topic string) (*topicConsumer, error) { - t := &topicConsumer{Consumer: c, topic: topic} - partitions, err := t.Partitions(t.topic) + partitionConsumers map[int32]sarama.PartitionConsumer + validator *cdctest.CountValidator + stopper <-chan struct{} +} + +func newTopicConsumer( + t test.Test, + consumer sarama.Consumer, + topic string, + validator cdctest.Validator, + stopper <-chan struct{}, +) (*topicConsumer, error) { + partitions, err := consumer.Partitions(topic) if err != nil { return nil, err } + + numPartitions := len(partitions) + topicPartitions := make([]string, 0, numPartitions) + partitionConsumers := make(map[int32]sarama.PartitionConsumer, numPartitions) for _, partition := range partitions { - t.partitions = append(t.partitions, strconv.Itoa(int(partition))) - pc, err := t.ConsumePartition(topic, partition, sarama.OffsetOldest) + topicPartitions = append(topicPartitions, strconv.Itoa(int(partition))) + pc, err := consumer.ConsumePartition(topic, partition, sarama.OffsetOldest) if err != nil { return nil, err } - t.partitionConsumers = append(t.partitionConsumers, pc) + partitionConsumers[partition] = pc + } + + if validator == nil { + validator = cdctest.NoOpValidator } - return t, nil + countValidator := cdctest.NewCountValidator(validator) + + return &topicConsumer{ + t: t, + consumer: consumer, + topic: topic, + partitions: topicPartitions, + partitionConsumers: partitionConsumers, + validator: countValidator, + stopper: stopper, + }, nil } -func (c *topicConsumer) tryNextMessage(ctx context.Context) *sarama.ConsumerMessage { - for _, pc := range c.partitionConsumers { +func (c *topicConsumer) tryNextMessage(ctx context.Context) (*sarama.ConsumerMessage, error) { + for partition, pc := range c.partitionConsumers { select { case <-ctx.Done(): - return nil + return nil, ctx.Err() case m := <-pc.Messages(): - return m + if m == nil { + return m, nil + } + if err := c.validateMessage(partition, m); err != nil { + return nil, err + } + return m, nil default: } } + return nil, nil +} + +func (c *topicConsumer) validateMessage(partition int32, m *sarama.ConsumerMessage) error { + updated, resolved, err := cdctest.ParseJSONValueTimestamps(m.Value) + if err != nil { + return err + } + partitionStr := strconv.Itoa(int(partition)) + switch { + case len(m.Key) == 0: + err := c.validator.NoteResolved(partitionStr, resolved) + if err != nil { + return err + } + default: + err := c.validator.NoteRow(partitionStr, string(m.Key), string(m.Value), updated) + if err != nil { + return err + } + } + if failures := c.validator.Failures(); len(failures) > 0 { + c.t.Fatalf("topic consumer for %s encountered validator error(s): %s", c.topic, strings.Join(failures, ",")) + } return nil } -func (c *topicConsumer) Next(ctx context.Context) *sarama.ConsumerMessage { - m := c.tryNextMessage(ctx) - for ; m == nil; m = c.tryNextMessage(ctx) { - if ctx.Err() != nil { - return nil +func (c *topicConsumer) next(ctx context.Context) (*sarama.ConsumerMessage, error) { + for { + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-c.stopper: + return nil, nil + default: + } + m, err := c.tryNextMessage(ctx) + if err != nil { + return nil, err + } + if m != nil { + return m, nil } } - return m } -func (c *topicConsumer) Close() { +func (c *topicConsumer) close() { for _, pc := range c.partitionConsumers { pc.AsyncClose() // Drain the messages and errors as required by AsyncClose. @@ -3317,5 +3465,5 @@ func (c *topicConsumer) Close() { for range pc.Errors() { } } - _ = c.Consumer.Close() + _ = c.consumer.Close() } diff --git a/pkg/cmd/roachtest/tests/mixed_version_cdc.go b/pkg/cmd/roachtest/tests/mixed_version_cdc.go index 7d6c76d17acd..da28013f888c 100644 --- a/pkg/cmd/roachtest/tests/mixed_version_cdc.go +++ b/pkg/cmd/roachtest/tests/mixed_version_cdc.go @@ -156,7 +156,7 @@ func (cmvt *cdcMixedVersionTester) StartKafka(t test.Test, c cluster.Cluster) (c } cmvt.kafka.manager = manager - consumer, err := cmvt.kafka.manager.newConsumer(cmvt.ctx, targetTable) + consumer, err := cmvt.kafka.manager.newConsumer(cmvt.ctx, targetTable, nil /* stopper */) if err != nil { t.Fatal(err) } @@ -166,7 +166,7 @@ func (cmvt *cdcMixedVersionTester) StartKafka(t test.Test, c cluster.Cluster) (c cmvt.kafka.consumer = consumer return func() { - cmvt.kafka.consumer.Close() + consumer.close() tearDown() } } @@ -258,15 +258,9 @@ func (cmvt *cdcMixedVersionTester) runKafkaConsumer( // context cancellation. We rely on consumer.Next() to check // the context. for { - m := cmvt.kafka.consumer.Next(ctx) - if m == nil { - // this is expected to happen once the test has finished and - // Kafka is being shut down. If it happens in the middle of - // the test, it will eventually time out, and this message - // should allow us to see that the validator finished - // earlier than it should have - l.Printf("end of changefeed") - return nil + m, err := cmvt.kafka.consumer.next(ctx) + if err != nil { + return err } // Forward resolved timetsamps to "heartbeat" that the changefeed is running.