diff --git a/pkg/cmd/roachtest/tests/cdc.go b/pkg/cmd/roachtest/tests/cdc.go index ec33bf80f9bf..48d02ad0174f 100644 --- a/pkg/cmd/roachtest/tests/cdc.go +++ b/pkg/cmd/roachtest/tests/cdc.go @@ -59,6 +59,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/jobutils" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/timeutil" @@ -242,17 +243,15 @@ func (ct *cdcTester) setupSink(args feedArgs) string { case pubsubSink: sinkURI = changefeedccl.GcpScheme + `://cockroach-ephemeral` + "?AUTH=implicit&topic_name=pubsubSink-roachtest®ion=us-east1" case kafkaSink: - kafkaNode := ct.kafkaSinkNode() - kafka := kafkaManager{ - t: ct.t, - c: ct.cluster, - kafkaSinkNode: kafkaNode, - mon: ct.mon, + kafka, _ := setupKafka(ct.ctx, ct.t, ct.cluster, ct.kafkaSinkNode()) + kafka.mon = ct.mon + kafka.validateOrder = args.kafkaArgs.validateOrder + + if err := kafka.startTopicConsumers(ct.ctx, args.targets, ct.doneCh); err != nil { + ct.t.Fatal(err) } - kafka.install(ct.ctx) - kafka.start(ct.ctx, "kafka") - if args.kafkaChaos { + if args.kafkaArgs.kafkaChaos { ct.mon.Go(func(ctx context.Context) error { period, downTime := 2*time.Minute, 20*time.Second return kafka.chaosLoop(ctx, period, downTime, ct.doneCh) @@ -513,14 +512,23 @@ type feedArgs struct { sinkType sinkType targets []string opts map[string]string - kafkaChaos bool assumeRole string tolerateErrors bool sinkURIOverride string cdcFeatureFlags + kafkaArgs kafkaFeedArgs +} + +// kafkaFeedArgs are args that are specific to kafkaSink changefeeds. +type kafkaFeedArgs struct { + // If kafkaChaos is true, the Kafka cluster will periodically restart + // to simulate unreliability. + kafkaChaos bool + // If validateOrder is set to true, order validators will be created + // for each topic to validate the changefeed's ordering guarantees. + validateOrder bool } -// TODO: Maybe move away from feedArgs since its only 3 things func (ct *cdcTester) newChangefeed(args feedArgs) changefeedJob { ct.t.Status(fmt.Sprintf("initiating %s sink", args.sinkType)) var sinkURI string @@ -545,6 +553,9 @@ func (ct *cdcTester) newChangefeed(args feedArgs) changefeedJob { } else { feedOptions["resolved"] = "" } + if args.kafkaArgs.validateOrder { + feedOptions["updated"] = "" + } for option, value := range args.opts { feedOptions[option] = value @@ -781,11 +792,11 @@ func runCDCBank(ctx context.Context, t test.Test, c cluster.Cluster) { t.Fatal(err) } - tc, err := kafka.newConsumer(ctx, "bank") + tc, err := kafka.newConsumer(ctx, "bank", nil /* stopper */) if err != nil { t.Fatal(errors.Wrap(err, "could not create kafka consumer")) } - defer tc.Close() + defer tc.close() l, err := t.L().ChildLogger(`changefeed`) if err != nil { @@ -814,9 +825,9 @@ func runCDCBank(ctx context.Context, t test.Test, c cluster.Cluster) { defer func() { close(messageBuf) }() v := cdctest.NewCountValidator(cdctest.NoOpValidator) for { - m := tc.Next(ctx) - if m == nil { - return fmt.Errorf("unexpected end of changefeed") + m, err := tc.next(ctx) + if err != nil { + return err } messageBuf <- m updated, resolved, err := cdctest.ParseJSONValueTimestamps(m.Value) @@ -1363,7 +1374,10 @@ func registerCDC(r registry.Registry) { feed := ct.newChangefeed(feedArgs{ sinkType: kafkaSink, targets: allTpccTargets, - opts: map[string]string{"initial_scan": "'no'"}, + kafkaArgs: kafkaFeedArgs{ + validateOrder: true, + }, + opts: map[string]string{"initial_scan": "'no'"}, }) ct.runFeedLatencyVerifier(feed, latencyTargets{ initialScanLatency: 3 * time.Minute, @@ -1540,10 +1554,13 @@ func registerCDC(r registry.Registry) { ct.runTPCCWorkload(tpccArgs{warehouses: 100, duration: "30m"}) feed := ct.newChangefeed(feedArgs{ - sinkType: kafkaSink, - targets: allTpccTargets, - kafkaChaos: true, - opts: map[string]string{"initial_scan": "'no'"}, + sinkType: kafkaSink, + targets: allTpccTargets, + kafkaArgs: kafkaFeedArgs{ + kafkaChaos: true, + validateOrder: true, + }, + opts: map[string]string{"initial_scan": "'no'"}, }) ct.runFeedLatencyVerifier(feed, latencyTargets{ initialScanLatency: 3 * time.Minute, @@ -1570,8 +1587,11 @@ func registerCDC(r registry.Registry) { ct.runTPCCWorkload(tpccArgs{warehouses: 100, duration: "30m", tolerateErrors: true}) feed := ct.newChangefeed(feedArgs{ - sinkType: kafkaSink, - targets: allTpccTargets, + sinkType: kafkaSink, + targets: allTpccTargets, + kafkaArgs: kafkaFeedArgs{ + validateOrder: true, + }, opts: map[string]string{"initial_scan": "'no'"}, tolerateErrors: true, }) @@ -1607,7 +1627,13 @@ func registerCDC(r registry.Registry) { t.Fatalf("failed statement %q: %s", alterStmt, err.Error()) } - feed := ct.newChangefeed(feedArgs{sinkType: kafkaSink, targets: allLedgerTargets}) + feed := ct.newChangefeed(feedArgs{ + sinkType: kafkaSink, + targets: allLedgerTargets, + kafkaArgs: kafkaFeedArgs{ + validateOrder: true, + }, + }) ct.runFeedLatencyVerifier(feed, latencyTargets{ initialScanLatency: 10 * time.Minute, steadyLatency: time.Minute, @@ -2357,6 +2383,10 @@ type kafkaManager struct { // Our method of requiring OAuth on the broker only works with Kafka 2 useKafka2 bool + + // validateOrder specifies whether consumers created by the + // kafkaManager should create and use order validators. + validateOrder bool } func (k kafkaManager) basePath() string { @@ -2984,7 +3014,9 @@ func (k kafkaManager) createTopic(ctx context.Context, topic string) error { }) } -func (k kafkaManager) newConsumer(ctx context.Context, topic string) (*topicConsumer, error) { +func (k kafkaManager) newConsumer( + ctx context.Context, topic string, stopper <-chan struct{}, +) (*topicConsumer, error) { kafkaAddrs := []string{k.consumerURL(ctx)} config := sarama.NewConfig() // I was seeing "error processing FetchRequest: kafka: error decoding @@ -2998,14 +3030,66 @@ func (k kafkaManager) newConsumer(ctx context.Context, topic string) (*topicCons if err != nil { return nil, err } - tc, err := makeTopicConsumer(consumer, topic) + var validator cdctest.Validator + if k.validateOrder { + validator = cdctest.NewOrderValidator(topic) + } + tc, err := newTopicConsumer(k.t, consumer, topic, validator, stopper) if err != nil { _ = consumer.Close() return nil, err } + k.t.L().Printf("topic consumer for %s has partitions: %s", topic, tc.partitions) return tc, nil } +func (k kafkaManager) startTopicConsumers( + ctx context.Context, targets []string, stopper <-chan struct{}, +) error { + for _, target := range targets { + // We need to strip off the database and schema name since the topic name + // is just the table name. + topic := target + lastPeriodIndex := strings.LastIndex(target, ".") + if lastPeriodIndex != -1 { + topic = topic[lastPeriodIndex+1:] + } + + k.t.L().Printf("starting topic consumer for topic: %s", topic) + k.mon.Go(func(ctx context.Context) error { + topicConsumer, err := k.newConsumer(ctx, topic, stopper) + if err != nil { + return err + } + defer topicConsumer.close() + everyN := log.Every(30 * time.Second) + for { + select { + case <-stopper: + return nil + case <-ctx.Done(): + return ctx.Err() + default: + } + + // The topicConsumer has order validation built-in so this has + // the side effect of validating the order of incoming messages. + _, err := topicConsumer.next(ctx) + if err != nil { + k.t.L().Printf("topic consumer for %s encountered error: %s", topic, err) + return err + } + if everyN.ShouldLog() { + k.t.L().Printf("topic consumer for %s validated %d rows and %d resolved timestamps", + topic, topicConsumer.validator.NumRows, topicConsumer.validator.NumResolved) + } + } + }) + } + + return nil +} + type tpccWorkload struct { workloadNodes option.NodeListOption sqlNodes option.NodeListOption @@ -3231,7 +3315,7 @@ func getChangefeedInfo(db *gosql.DB, jobID int) (*changefeedInfo, error) { // nightly, but nice for development. func stopFeeds(db *gosql.DB) { _, _ = db.Exec(`CANCEL JOBS ( - SELECT job_id FROM [SHOW JOBS] WHERE status = 'running' + SELECT job_id FROM [SHOW CHANGEFEED JOBS] WHERE status = 'running' )`) } @@ -3261,54 +3345,118 @@ func setupKafka( } type topicConsumer struct { - sarama.Consumer - + t test.Test + consumer sarama.Consumer topic string partitions []string - partitionConsumers []sarama.PartitionConsumer -} - -func makeTopicConsumer(c sarama.Consumer, topic string) (*topicConsumer, error) { - t := &topicConsumer{Consumer: c, topic: topic} - partitions, err := t.Partitions(t.topic) + partitionConsumers map[int32]sarama.PartitionConsumer + validator *cdctest.CountValidator + stopper <-chan struct{} +} + +func newTopicConsumer( + t test.Test, + consumer sarama.Consumer, + topic string, + validator cdctest.Validator, + stopper <-chan struct{}, +) (*topicConsumer, error) { + partitions, err := consumer.Partitions(topic) if err != nil { return nil, err } + + numPartitions := len(partitions) + topicPartitions := make([]string, 0, numPartitions) + partitionConsumers := make(map[int32]sarama.PartitionConsumer, numPartitions) for _, partition := range partitions { - t.partitions = append(t.partitions, strconv.Itoa(int(partition))) - pc, err := t.ConsumePartition(topic, partition, sarama.OffsetOldest) + topicPartitions = append(topicPartitions, strconv.Itoa(int(partition))) + pc, err := consumer.ConsumePartition(topic, partition, sarama.OffsetOldest) if err != nil { return nil, err } - t.partitionConsumers = append(t.partitionConsumers, pc) + partitionConsumers[partition] = pc + } + + if validator == nil { + validator = cdctest.NoOpValidator } - return t, nil + countValidator := cdctest.NewCountValidator(validator) + + return &topicConsumer{ + t: t, + consumer: consumer, + topic: topic, + partitions: topicPartitions, + partitionConsumers: partitionConsumers, + validator: countValidator, + stopper: stopper, + }, nil } -func (c *topicConsumer) tryNextMessage(ctx context.Context) *sarama.ConsumerMessage { - for _, pc := range c.partitionConsumers { +func (c *topicConsumer) tryNextMessage(ctx context.Context) (*sarama.ConsumerMessage, error) { + for partition, pc := range c.partitionConsumers { select { case <-ctx.Done(): - return nil + return nil, ctx.Err() case m := <-pc.Messages(): - return m + if m == nil { + return m, nil + } + if err := c.validateMessage(partition, m); err != nil { + return nil, err + } + return m, nil default: } } + return nil, nil +} + +func (c *topicConsumer) validateMessage(partition int32, m *sarama.ConsumerMessage) error { + updated, resolved, err := cdctest.ParseJSONValueTimestamps(m.Value) + if err != nil { + return err + } + partitionStr := strconv.Itoa(int(partition)) + switch { + case len(m.Key) == 0: + err := c.validator.NoteResolved(partitionStr, resolved) + if err != nil { + return err + } + default: + err := c.validator.NoteRow(partitionStr, string(m.Key), string(m.Value), updated) + if err != nil { + return err + } + } + if failures := c.validator.Failures(); len(failures) > 0 { + c.t.Fatalf("topic consumer for %s encountered validator error(s): %s", c.topic, strings.Join(failures, ",")) + } return nil } -func (c *topicConsumer) Next(ctx context.Context) *sarama.ConsumerMessage { - m := c.tryNextMessage(ctx) - for ; m == nil; m = c.tryNextMessage(ctx) { - if ctx.Err() != nil { - return nil +func (c *topicConsumer) next(ctx context.Context) (*sarama.ConsumerMessage, error) { + for { + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-c.stopper: + return nil, nil + default: + } + m, err := c.tryNextMessage(ctx) + if err != nil { + return nil, err + } + if m != nil { + return m, nil } } - return m } -func (c *topicConsumer) Close() { +func (c *topicConsumer) close() { for _, pc := range c.partitionConsumers { pc.AsyncClose() // Drain the messages and errors as required by AsyncClose. @@ -3317,5 +3465,5 @@ func (c *topicConsumer) Close() { for range pc.Errors() { } } - _ = c.Consumer.Close() + _ = c.consumer.Close() } diff --git a/pkg/cmd/roachtest/tests/mixed_version_cdc.go b/pkg/cmd/roachtest/tests/mixed_version_cdc.go index 7d6c76d17acd..da28013f888c 100644 --- a/pkg/cmd/roachtest/tests/mixed_version_cdc.go +++ b/pkg/cmd/roachtest/tests/mixed_version_cdc.go @@ -156,7 +156,7 @@ func (cmvt *cdcMixedVersionTester) StartKafka(t test.Test, c cluster.Cluster) (c } cmvt.kafka.manager = manager - consumer, err := cmvt.kafka.manager.newConsumer(cmvt.ctx, targetTable) + consumer, err := cmvt.kafka.manager.newConsumer(cmvt.ctx, targetTable, nil /* stopper */) if err != nil { t.Fatal(err) } @@ -166,7 +166,7 @@ func (cmvt *cdcMixedVersionTester) StartKafka(t test.Test, c cluster.Cluster) (c cmvt.kafka.consumer = consumer return func() { - cmvt.kafka.consumer.Close() + consumer.close() tearDown() } } @@ -258,15 +258,9 @@ func (cmvt *cdcMixedVersionTester) runKafkaConsumer( // context cancellation. We rely on consumer.Next() to check // the context. for { - m := cmvt.kafka.consumer.Next(ctx) - if m == nil { - // this is expected to happen once the test has finished and - // Kafka is being shut down. If it happens in the middle of - // the test, it will eventually time out, and this message - // should allow us to see that the validator finished - // earlier than it should have - l.Printf("end of changefeed") - return nil + m, err := cmvt.kafka.consumer.next(ctx) + if err != nil { + return err } // Forward resolved timetsamps to "heartbeat" that the changefeed is running.