diff --git a/pkg/ccl/changefeedccl/cdctest/nemeses.go b/pkg/ccl/changefeedccl/cdctest/nemeses.go index d5dcf769415c..01a7542d4756 100644 --- a/pkg/ccl/changefeedccl/cdctest/nemeses.go +++ b/pkg/ccl/changefeedccl/cdctest/nemeses.go @@ -187,7 +187,7 @@ func RunNemesis( if err != nil { return nil, err } - ns.v = MakeCountValidator(Validators{ + ns.v = NewCountValidator(Validators{ NewOrderValidator(`foo`), baV, fprintV, diff --git a/pkg/ccl/changefeedccl/cdctest/validator.go b/pkg/ccl/changefeedccl/cdctest/validator.go index 3dc807737177..ca7ddf16c13d 100644 --- a/pkg/ccl/changefeedccl/cdctest/validator.go +++ b/pkg/ccl/changefeedccl/cdctest/validator.go @@ -142,8 +142,11 @@ func (v *orderValidator) NoteRow(partition string, key, value string, updated hl }) seen := timestampsIdx < len(timestampValueTuples) && timestampValueTuples[timestampsIdx].ts == updated + if seen { + return nil + } - if !seen && len(timestampValueTuples) > 0 && + if len(timestampValueTuples) > 0 && updated.Less(timestampValueTuples[len(timestampValueTuples)-1].ts) { v.failures = append(v.failures, fmt.Sprintf( `topic %s partition %s: saw new row timestamp %s after %s was seen`, @@ -152,21 +155,20 @@ func (v *orderValidator) NoteRow(partition string, key, value string, updated hl timestampValueTuples[len(timestampValueTuples)-1].ts.AsOfSystemTime(), )) } - if !seen && updated.Less(v.resolved[partition]) { + latestResolved := v.resolved[partition] + if updated.Less(latestResolved) { v.failures = append(v.failures, fmt.Sprintf( `topic %s partition %s: saw new row timestamp %s after %s was resolved`, - v.topic, partition, updated.AsOfSystemTime(), v.resolved[partition].AsOfSystemTime(), + v.topic, partition, updated.AsOfSystemTime(), latestResolved.AsOfSystemTime(), )) } - if !seen { - v.keyTimestampAndValues[key] = append( - append(timestampValueTuples[:timestampsIdx], timestampValue{ - ts: updated, - value: value, - }), - timestampValueTuples[timestampsIdx:]...) - } + v.keyTimestampAndValues[key] = append( + append(timestampValueTuples[:timestampsIdx], timestampValue{ + ts: updated, + value: value, + }), + timestampValueTuples[timestampsIdx:]...) return nil } @@ -675,8 +677,8 @@ type CountValidator struct { rowsSinceResolved int } -// MakeCountValidator returns a CountValidator wrapping the given Validator. -func MakeCountValidator(v Validator) *CountValidator { +// NewCountValidator returns a CountValidator wrapping the given Validator. +func NewCountValidator(v Validator) *CountValidator { return &CountValidator{v: v} } diff --git a/pkg/cmd/roachtest/tests/cdc.go b/pkg/cmd/roachtest/tests/cdc.go index 9ddce19aa931..b23d985f2f28 100644 --- a/pkg/cmd/roachtest/tests/cdc.go +++ b/pkg/cmd/roachtest/tests/cdc.go @@ -59,6 +59,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/jobutils" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/timeutil" @@ -245,17 +246,15 @@ func (ct *cdcTester) setupSink(args feedArgs) string { case pubsubSink: sinkURI = changefeedccl.GcpScheme + `://cockroach-ephemeral` + "?AUTH=implicit&topic_name=pubsubSink-roachtest®ion=us-east1" case kafkaSink: - kafkaNode := ct.kafkaSinkNode() - kafka := kafkaManager{ - t: ct.t, - c: ct.cluster, - kafkaSinkNode: kafkaNode, - mon: ct.mon, + kafka, _ := setupKafka(ct.ctx, ct.t, ct.cluster, ct.kafkaSinkNode()) + kafka.mon = ct.mon + kafka.validateOrder = args.kafkaArgs.validateOrder + + if err := kafka.startTopicConsumers(ct.ctx, args.targets, ct.doneCh); err != nil { + ct.t.Fatal(err) } - kafka.install(ct.ctx) - kafka.start(ct.ctx, "kafka") - if args.kafkaChaos { + if args.kafkaArgs.kafkaChaos { ct.mon.Go(func(ctx context.Context) error { period, downTime := 2*time.Minute, 20*time.Second return kafka.chaosLoop(ctx, period, downTime, ct.doneCh) @@ -516,14 +515,23 @@ type feedArgs struct { sinkType sinkType targets []string opts map[string]string - kafkaChaos bool assumeRole string tolerateErrors bool sinkURIOverride string cdcFeatureFlags + kafkaArgs kafkaFeedArgs +} + +// kafkaFeedArgs are args that are specific to kafkaSink changefeeds. +type kafkaFeedArgs struct { + // If kafkaChaos is true, the Kafka cluster will periodically restart + // to simulate unreliability. + kafkaChaos bool + // If validateOrder is set to true, order validators will be created + // for each topic to validate the changefeed's ordering guarantees. + validateOrder bool } -// TODO: Maybe move away from feedArgs since its only 3 things func (ct *cdcTester) newChangefeed(args feedArgs) changefeedJob { ct.t.Status(fmt.Sprintf("initiating %s sink", args.sinkType)) var sinkURI string @@ -548,6 +556,9 @@ func (ct *cdcTester) newChangefeed(args feedArgs) changefeedJob { } else { feedOptions["resolved"] = "" } + if args.kafkaArgs.validateOrder { + feedOptions["updated"] = "" + } for option, value := range args.opts { feedOptions[option] = value @@ -784,11 +795,11 @@ func runCDCBank(ctx context.Context, t test.Test, c cluster.Cluster) { t.Fatal(err) } - tc, err := kafka.newConsumer(ctx, "bank") + tc, err := kafka.newConsumer(ctx, "bank", nil /* stopper */) if err != nil { t.Fatal(errors.Wrap(err, "could not create kafka consumer")) } - defer tc.Close() + defer tc.close() l, err := t.L().ChildLogger(`changefeed`) if err != nil { @@ -815,11 +826,11 @@ func runCDCBank(ctx context.Context, t test.Test, c cluster.Cluster) { m.Go(func(ctx context.Context) error { defer workloadCancel() defer func() { close(messageBuf) }() - v := cdctest.MakeCountValidator(cdctest.NoOpValidator) + v := cdctest.NewCountValidator(cdctest.NoOpValidator) for { - m := tc.Next(ctx) - if m == nil { - return fmt.Errorf("unexpected end of changefeed") + m, err := tc.next(ctx) + if err != nil { + return err } messageBuf <- m updated, resolved, err := cdctest.ParseJSONValueTimestamps(m.Value) @@ -866,7 +877,7 @@ func runCDCBank(ctx context.Context, t test.Test, c cluster.Cluster) { fprintV, baV, } - v := cdctest.MakeCountValidator(validators) + v := cdctest.NewCountValidator(validators) timeSpentValidatingRows := 0 * time.Second numRowsValidated := 0 @@ -1366,7 +1377,10 @@ func registerCDC(r registry.Registry) { feed := ct.newChangefeed(feedArgs{ sinkType: kafkaSink, targets: allTpccTargets, - opts: map[string]string{"initial_scan": "'no'"}, + kafkaArgs: kafkaFeedArgs{ + validateOrder: true, + }, + opts: map[string]string{"initial_scan": "'no'"}, }) ct.runFeedLatencyVerifier(feed, latencyTargets{ initialScanLatency: 3 * time.Minute, @@ -1528,7 +1542,7 @@ func registerCDC(r registry.Registry) { }, }) r.Add(registry.TestSpec{ - Name: "cdc/sink-chaos", + Name: "cdc/kafka-chaos", Owner: `cdc`, Benchmark: true, Cluster: r.MakeClusterSpec(4, spec.CPU(16)), @@ -1543,10 +1557,13 @@ func registerCDC(r registry.Registry) { ct.runTPCCWorkload(tpccArgs{warehouses: 100, duration: "30m"}) feed := ct.newChangefeed(feedArgs{ - sinkType: kafkaSink, - targets: allTpccTargets, - kafkaChaos: true, - opts: map[string]string{"initial_scan": "'no'"}, + sinkType: kafkaSink, + targets: allTpccTargets, + kafkaArgs: kafkaFeedArgs{ + kafkaChaos: true, + validateOrder: true, + }, + opts: map[string]string{"initial_scan": "'no'"}, }) ct.runFeedLatencyVerifier(feed, latencyTargets{ initialScanLatency: 3 * time.Minute, @@ -1573,8 +1590,11 @@ func registerCDC(r registry.Registry) { ct.runTPCCWorkload(tpccArgs{warehouses: 100, duration: "30m", tolerateErrors: true}) feed := ct.newChangefeed(feedArgs{ - sinkType: kafkaSink, - targets: allTpccTargets, + sinkType: kafkaSink, + targets: allTpccTargets, + kafkaArgs: kafkaFeedArgs{ + validateOrder: true, + }, opts: map[string]string{"initial_scan": "'no'"}, tolerateErrors: true, }) @@ -1610,7 +1630,13 @@ func registerCDC(r registry.Registry) { t.Fatalf("failed statement %q: %s", alterStmt, err.Error()) } - feed := ct.newChangefeed(feedArgs{sinkType: kafkaSink, targets: allLedgerTargets}) + feed := ct.newChangefeed(feedArgs{ + sinkType: kafkaSink, + targets: allLedgerTargets, + kafkaArgs: kafkaFeedArgs{ + validateOrder: true, + }, + }) ct.runFeedLatencyVerifier(feed, latencyTargets{ initialScanLatency: 10 * time.Minute, steadyLatency: time.Minute, @@ -2360,6 +2386,10 @@ type kafkaManager struct { // Our method of requiring OAuth on the broker only works with Kafka 2 useKafka2 bool + + // validateOrder specifies whether consumers created by the + // kafkaManager should create and use order validators. + validateOrder bool } func (k kafkaManager) basePath() string { @@ -2875,11 +2905,11 @@ func (k kafkaManager) stop(ctx context.Context) { } func (k kafkaManager) chaosLoop( - ctx context.Context, period, downTime time.Duration, stopper chan struct{}, + ctx context.Context, period, downTime time.Duration, stopper <-chan struct{}, ) error { t := time.NewTicker(period) defer t.Stop() - for { + for i := 0; ; i++ { select { case <-stopper: return nil @@ -2888,6 +2918,7 @@ func (k kafkaManager) chaosLoop( case <-t.C: } + k.t.L().Printf("kafka chaos loop iteration %d: stopping", i) k.stop(ctx) select { @@ -2898,6 +2929,7 @@ func (k kafkaManager) chaosLoop( case <-time.After(downTime): } + k.t.L().Printf("kafka chaos loop iteration %d: restarting", i) k.restart(ctx, "kafka") } } @@ -2993,7 +3025,9 @@ func (k kafkaManager) createTopic(ctx context.Context, topic string) error { }) } -func (k kafkaManager) newConsumer(ctx context.Context, topic string) (*topicConsumer, error) { +func (k kafkaManager) newConsumer( + ctx context.Context, topic string, stopper <-chan struct{}, +) (*topicConsumer, error) { kafkaAddrs := []string{k.consumerURL(ctx)} config := sarama.NewConfig() // I was seeing "error processing FetchRequest: kafka: error decoding @@ -3007,14 +3041,66 @@ func (k kafkaManager) newConsumer(ctx context.Context, topic string) (*topicCons if err != nil { return nil, err } - tc, err := makeTopicConsumer(consumer, topic) + var validator cdctest.Validator + if k.validateOrder { + validator = cdctest.NewOrderValidator(topic) + } + tc, err := newTopicConsumer(k.t, consumer, topic, validator, stopper) if err != nil { _ = consumer.Close() return nil, err } + k.t.L().Printf("topic consumer for %s has partitions: %s", topic, tc.partitions) return tc, nil } +func (k kafkaManager) startTopicConsumers( + ctx context.Context, targets []string, stopper <-chan struct{}, +) error { + for _, target := range targets { + // We need to strip off the database and schema name since the topic name + // is just the table name. + topic := target + lastPeriodIndex := strings.LastIndex(target, ".") + if lastPeriodIndex != -1 { + topic = topic[lastPeriodIndex+1:] + } + + k.t.L().Printf("starting topic consumer for topic: %s", topic) + k.mon.Go(func(ctx context.Context) error { + topicConsumer, err := k.newConsumer(ctx, topic, stopper) + if err != nil { + return err + } + defer topicConsumer.close() + everyN := log.Every(30 * time.Second) + for { + select { + case <-stopper: + return nil + case <-ctx.Done(): + return ctx.Err() + default: + } + + // The topicConsumer has order validation built-in so this has + // the side effect of validating the order of incoming messages. + _, err := topicConsumer.next(ctx) + if err != nil { + k.t.L().Printf("topic consumer for %s encountered error: %s", topic, err) + return err + } + if everyN.ShouldLog() { + k.t.L().Printf("topic consumer for %s validated %d rows and %d resolved timestamps", + topic, topicConsumer.validator.NumRows, topicConsumer.validator.NumResolved) + } + } + }) + } + + return nil +} + type tpccWorkload struct { workloadNodes option.NodeListOption sqlNodes option.NodeListOption @@ -3240,7 +3326,7 @@ func getChangefeedInfo(db *gosql.DB, jobID int) (*changefeedInfo, error) { // nightly, but nice for development. func stopFeeds(db *gosql.DB) { _, _ = db.Exec(`CANCEL JOBS ( - SELECT job_id FROM [SHOW JOBS] WHERE status = 'running' + SELECT job_id FROM [SHOW CHANGEFEED JOBS] WHERE status = 'running' )`) } @@ -3270,54 +3356,118 @@ func setupKafka( } type topicConsumer struct { - sarama.Consumer - + t test.Test + consumer sarama.Consumer topic string partitions []string - partitionConsumers []sarama.PartitionConsumer -} - -func makeTopicConsumer(c sarama.Consumer, topic string) (*topicConsumer, error) { - t := &topicConsumer{Consumer: c, topic: topic} - partitions, err := t.Partitions(t.topic) + partitionConsumers map[int32]sarama.PartitionConsumer + validator *cdctest.CountValidator + stopper <-chan struct{} +} + +func newTopicConsumer( + t test.Test, + consumer sarama.Consumer, + topic string, + validator cdctest.Validator, + stopper <-chan struct{}, +) (*topicConsumer, error) { + partitions, err := consumer.Partitions(topic) if err != nil { return nil, err } + + numPartitions := len(partitions) + topicPartitions := make([]string, 0, numPartitions) + partitionConsumers := make(map[int32]sarama.PartitionConsumer, numPartitions) for _, partition := range partitions { - t.partitions = append(t.partitions, strconv.Itoa(int(partition))) - pc, err := t.ConsumePartition(topic, partition, sarama.OffsetOldest) + topicPartitions = append(topicPartitions, strconv.Itoa(int(partition))) + pc, err := consumer.ConsumePartition(topic, partition, sarama.OffsetOldest) if err != nil { return nil, err } - t.partitionConsumers = append(t.partitionConsumers, pc) + partitionConsumers[partition] = pc + } + + if validator == nil { + validator = cdctest.NoOpValidator } - return t, nil + countValidator := cdctest.NewCountValidator(validator) + + return &topicConsumer{ + t: t, + consumer: consumer, + topic: topic, + partitions: topicPartitions, + partitionConsumers: partitionConsumers, + validator: countValidator, + stopper: stopper, + }, nil } -func (c *topicConsumer) tryNextMessage(ctx context.Context) *sarama.ConsumerMessage { - for _, pc := range c.partitionConsumers { +func (c *topicConsumer) tryNextMessage(ctx context.Context) (*sarama.ConsumerMessage, error) { + for partition, pc := range c.partitionConsumers { select { case <-ctx.Done(): - return nil + return nil, ctx.Err() case m := <-pc.Messages(): - return m + if m == nil { + return m, nil + } + if err := c.validateMessage(partition, m); err != nil { + return nil, err + } + return m, nil default: } } + return nil, nil +} + +func (c *topicConsumer) validateMessage(partition int32, m *sarama.ConsumerMessage) error { + updated, resolved, err := cdctest.ParseJSONValueTimestamps(m.Value) + if err != nil { + return err + } + partitionStr := strconv.Itoa(int(partition)) + switch { + case len(m.Key) == 0: + err := c.validator.NoteResolved(partitionStr, resolved) + if err != nil { + return err + } + default: + err := c.validator.NoteRow(partitionStr, string(m.Key), string(m.Value), updated) + if err != nil { + return err + } + } + if failures := c.validator.Failures(); len(failures) > 0 { + c.t.Fatalf("topic consumer for %s encountered validator error(s): %s", c.topic, strings.Join(failures, ",")) + } return nil } -func (c *topicConsumer) Next(ctx context.Context) *sarama.ConsumerMessage { - m := c.tryNextMessage(ctx) - for ; m == nil; m = c.tryNextMessage(ctx) { - if ctx.Err() != nil { - return nil +func (c *topicConsumer) next(ctx context.Context) (*sarama.ConsumerMessage, error) { + for { + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-c.stopper: + return nil, nil + default: + } + m, err := c.tryNextMessage(ctx) + if err != nil { + return nil, err + } + if m != nil { + return m, nil } } - return m } -func (c *topicConsumer) Close() { +func (c *topicConsumer) close() { for _, pc := range c.partitionConsumers { pc.AsyncClose() // Drain the messages and errors as required by AsyncClose. @@ -3326,5 +3476,5 @@ func (c *topicConsumer) Close() { for range pc.Errors() { } } - _ = c.Consumer.Close() + _ = c.consumer.Close() } diff --git a/pkg/cmd/roachtest/tests/mixed_version_cdc.go b/pkg/cmd/roachtest/tests/mixed_version_cdc.go index 2dea11a21c2e..8b3be7355bc5 100644 --- a/pkg/cmd/roachtest/tests/mixed_version_cdc.go +++ b/pkg/cmd/roachtest/tests/mixed_version_cdc.go @@ -156,7 +156,7 @@ func (cmvt *cdcMixedVersionTester) StartKafka(t test.Test, c cluster.Cluster) (c } cmvt.kafka.manager = manager - consumer, err := cmvt.kafka.manager.newConsumer(cmvt.ctx, targetTable) + consumer, err := cmvt.kafka.manager.newConsumer(cmvt.ctx, targetTable, nil /* stopper */) if err != nil { t.Fatal(err) } @@ -166,7 +166,7 @@ func (cmvt *cdcMixedVersionTester) StartKafka(t test.Test, c cluster.Cluster) (c cmvt.kafka.consumer = consumer return func() { - cmvt.kafka.consumer.Close() + consumer.close() tearDown() } } @@ -241,7 +241,7 @@ func (cmvt *cdcMixedVersionTester) setupValidator( fprintV, } - cmvt.validator = cdctest.MakeCountValidator(validators) + cmvt.validator = cdctest.NewCountValidator(validators) cmvt.fprintV = fprintV return nil } @@ -258,15 +258,9 @@ func (cmvt *cdcMixedVersionTester) runKafkaConsumer( // context cancellation. We rely on consumer.Next() to check // the context. for { - m := cmvt.kafka.consumer.Next(ctx) - if m == nil { - // this is expected to happen once the test has finished and - // Kafka is being shut down. If it happens in the middle of - // the test, it will eventually time out, and this message - // should allow us to see that the validator finished - // earlier than it should have - l.Printf("end of changefeed") - return nil + m, err := cmvt.kafka.consumer.next(ctx) + if err != nil { + return err } // Forward resolved timetsamps to "heartbeat" that the changefeed is running.