From c3fffbf9350c2d20cb9c6def3aaa3459b81739ab Mon Sep 17 00:00:00 2001 From: Aaron Zinger Date: Wed, 1 Jun 2022 17:05:48 -0400 Subject: [PATCH] changefeedccl: correctly handle old-style protobufs in rowfetcher_cache Fixes https://github.com/cockroachdb/cockroach/issues/82309. Release note (bug fix): Fixed a bug where changefeeds created before upgrading to 22.1 would silently fail to emit any data other than resolved timestamps. --- pkg/ccl/changefeedccl/BUILD.bazel | 1 + pkg/ccl/changefeedccl/bench_test.go | 5 +- .../changefeedccl/cdceval/expr_eval_test.go | 34 ++-- pkg/ccl/changefeedccl/cdcevent/BUILD.bazel | 1 - pkg/ccl/changefeedccl/cdcevent/event.go | 18 +- pkg/ccl/changefeedccl/cdcevent/event_test.go | 34 ++-- .../cdcevent/rowfetcher_cache.go | 7 +- .../changefeedccl/changefeed_processors.go | 11 +- pkg/ccl/changefeedccl/changefeed_stmt.go | 4 +- pkg/ccl/changefeedccl/event_processing.go | 13 +- .../changefeedccl/event_processing_test.go | 168 ++++++++++++++++++ 11 files changed, 241 insertions(+), 55 deletions(-) create mode 100644 pkg/ccl/changefeedccl/event_processing_test.go diff --git a/pkg/ccl/changefeedccl/BUILD.bazel b/pkg/ccl/changefeedccl/BUILD.bazel index bba3a8531b5f..cd41fc5367e9 100644 --- a/pkg/ccl/changefeedccl/BUILD.bazel +++ b/pkg/ccl/changefeedccl/BUILD.bazel @@ -137,6 +137,7 @@ go_test( "bench_test.go", "changefeed_test.go", "encoder_test.go", + "event_processing_test.go", "helpers_tenant_shim_test.go", "helpers_test.go", "main_test.go", diff --git a/pkg/ccl/changefeedccl/bench_test.go b/pkg/ccl/changefeedccl/bench_test.go index 017001f46c32..835e1dfc7fb9 100644 --- a/pkg/ccl/changefeedccl/bench_test.go +++ b/pkg/ccl/changefeedccl/bench_test.go @@ -252,8 +252,11 @@ func createBenchmarkChangefeed( return nil, nil, err } serverCfg := s.DistSQLServer().(*distsql.ServerImpl).ServerConfig - eventConsumer := newKVEventToRowConsumer(ctx, &serverCfg, sf, initialHighWater, + eventConsumer, err := newKVEventToRowConsumer(ctx, &serverCfg, sf, initialHighWater, sink, encoder, details, TestingKnobs{}, nil) + if err != nil { + return nil, nil, err + } tickFn := func(ctx context.Context) (*jobspb.ResolvedSpan, error) { event, err := buf.Get(ctx) if err != nil { diff --git a/pkg/ccl/changefeedccl/cdceval/expr_eval_test.go b/pkg/ccl/changefeedccl/cdceval/expr_eval_test.go index 605783c8c1df..9d04a025d3b4 100644 --- a/pkg/ccl/changefeedccl/cdceval/expr_eval_test.go +++ b/pkg/ccl/changefeedccl/cdceval/expr_eval_test.go @@ -52,16 +52,16 @@ func TestNoopPredicate(t *testing.T) { serverCfg := s.DistSQLServer().(*distsql.ServerImpl).ServerConfig ctx := context.Background() - decoder := cdcevent.NewEventDecoder(ctx, &serverCfg, - jobspb.ChangefeedDetails{ - TargetSpecifications: []jobspb.ChangefeedTargetSpecification{ - { - Type: jobspb.ChangefeedTargetSpecification_COLUMN_FAMILY, - TableID: desc.GetID(), - FamilyName: "most", - }, + decoder, err := cdcevent.NewEventDecoder( + ctx, &serverCfg, + []jobspb.ChangefeedTargetSpecification{ + { + Type: jobspb.ChangefeedTargetSpecification_COLUMN_FAMILY, + TableID: desc.GetID(), + FamilyName: "most", }, - }) + }, false) + require.NoError(t, err) popRow, cleanup := cdctest.MakeRangeFeedValueReader(t, s.ExecutorConfig(), desc) defer cleanup() @@ -452,18 +452,18 @@ CREATE TABLE foo ( targetType = jobspb.ChangefeedTargetSpecification_COLUMN_FAMILY } - details := jobspb.ChangefeedDetails{ - TargetSpecifications: []jobspb.ChangefeedTargetSpecification{ - { - Type: targetType, - TableID: desc.GetID(), - FamilyName: tc.familyName, - }, + targets := []jobspb.ChangefeedTargetSpecification{ + { + Type: targetType, + TableID: desc.GetID(), + FamilyName: tc.familyName, }, } + serverCfg := s.DistSQLServer().(*distsql.ServerImpl).ServerConfig ctx := context.Background() - decoder := cdcevent.NewEventDecoder(ctx, &serverCfg, details) + decoder, err := cdcevent.NewEventDecoder(ctx, &serverCfg, targets, false) + require.NoError(t, err) for _, action := range tc.setupActions { sqlDB.Exec(t, action) diff --git a/pkg/ccl/changefeedccl/cdcevent/BUILD.bazel b/pkg/ccl/changefeedccl/cdcevent/BUILD.bazel index 3baddb22699e..52cc7dc9763b 100644 --- a/pkg/ccl/changefeedccl/cdcevent/BUILD.bazel +++ b/pkg/ccl/changefeedccl/cdcevent/BUILD.bazel @@ -46,7 +46,6 @@ go_test( deps = [ "//pkg/base", "//pkg/ccl/changefeedccl/cdctest", - "//pkg/ccl/changefeedccl/changefeedbase", "//pkg/ccl/utilccl", "//pkg/jobs/jobspb", "//pkg/roachpb", diff --git a/pkg/ccl/changefeedccl/cdcevent/event.go b/pkg/ccl/changefeedccl/cdcevent/event.go index 1ca6bdae6d0d..a54d968c95f3 100644 --- a/pkg/ccl/changefeedccl/cdcevent/event.go +++ b/pkg/ccl/changefeedccl/cdcevent/event.go @@ -12,7 +12,6 @@ import ( "context" "fmt" - "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog" @@ -346,18 +345,23 @@ func getEventDescriptorCached( // NewEventDecoder returns key value decoder. func NewEventDecoder( - ctx context.Context, cfg *execinfra.ServerConfig, details jobspb.ChangefeedDetails, -) Decoder { - rfCache := newRowFetcherCache( + ctx context.Context, + cfg *execinfra.ServerConfig, + targets []jobspb.ChangefeedTargetSpecification, + includeVirtual bool, +) (Decoder, error) { + rfCache, err := newRowFetcherCache( ctx, cfg.Codec, cfg.LeaseManager.(*lease.Manager), cfg.CollectionFactory, cfg.DB, - details.TargetSpecifications, + targets, ) + if err != nil { + return nil, err + } - includeVirtual := details.Opts[changefeedbase.OptVirtualColumns] == string(changefeedbase.OptVirtualColumnsNull) eventDescriptorCache := cache.NewUnorderedCache(defaultCacheConfig) getEventDescriptor := func( desc catalog.TableDescriptor, @@ -370,7 +374,7 @@ func NewEventDecoder( return &eventDecoder{ getEventDescriptor: getEventDescriptor, rfCache: rfCache, - } + }, nil } // DecodeKV decodes key value at specified schema timestamp. diff --git a/pkg/ccl/changefeedccl/cdcevent/event_test.go b/pkg/ccl/changefeedccl/cdcevent/event_test.go index 75ed3f4e78a1..3118341d6782 100644 --- a/pkg/ccl/changefeedccl/cdcevent/event_test.go +++ b/pkg/ccl/changefeedccl/cdcevent/event_test.go @@ -15,7 +15,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdctest" - "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog" @@ -149,7 +148,7 @@ CREATE TABLE foo ( for _, tc := range []struct { testName string familyName string // Must be set if targetType ChangefeedTargetSpecification_COLUMN_FAMILY - virtualColumn changefeedbase.VirtualColumnVisibility + includeVirtual bool actions []string expectMainFamily []decodeExpectation expectOnlyCFamily []decodeExpectation @@ -167,10 +166,10 @@ CREATE TABLE foo ( }, }, { - testName: "main/primary_cols_with_virtual", - familyName: "main", - actions: []string{"INSERT INTO foo (a, b) VALUES (1, 'second test')"}, - virtualColumn: changefeedbase.OptVirtualColumnsNull, + testName: "main/primary_cols_with_virtual", + familyName: "main", + actions: []string{"INSERT INTO foo (a, b) VALUES (1, 'second test')"}, + includeVirtual: true, expectMainFamily: []decodeExpectation{ { keyValues: []string{"second test", "1"}, @@ -286,26 +285,21 @@ CREATE TABLE foo ( targetType = jobspb.ChangefeedTargetSpecification_COLUMN_FAMILY } - details := jobspb.ChangefeedDetails{ - Opts: map[string]string{ - changefeedbase.OptVirtualColumns: string(tc.virtualColumn), - }, - TargetSpecifications: []jobspb.ChangefeedTargetSpecification{ - { - Type: targetType, - TableID: tableDesc.GetID(), - FamilyName: tc.familyName, - }, - }, - } - for _, action := range tc.actions { sqlDB.Exec(t, action) } + targets := []jobspb.ChangefeedTargetSpecification{ + { + Type: targetType, + TableID: tableDesc.GetID(), + FamilyName: tc.familyName, + }, + } serverCfg := s.DistSQLServer().(*distsql.ServerImpl).ServerConfig ctx := context.Background() - decoder := NewEventDecoder(ctx, &serverCfg, details) + decoder, err := NewEventDecoder(ctx, &serverCfg, targets, tc.includeVirtual) + require.NoError(t, err) expectedEvents := len(tc.expectMainFamily) + len(tc.expectOnlyCFamily) for i := 0; i < expectedEvents; i++ { v := popRow(t) diff --git a/pkg/ccl/changefeedccl/cdcevent/rowfetcher_cache.go b/pkg/ccl/changefeedccl/cdcevent/rowfetcher_cache.go index 8435ae987f44..018c08cfa877 100644 --- a/pkg/ccl/changefeedccl/cdcevent/rowfetcher_cache.go +++ b/pkg/ccl/changefeedccl/cdcevent/rowfetcher_cache.go @@ -80,7 +80,10 @@ func newRowFetcherCache( cf *descs.CollectionFactory, db *kv.DB, specs []jobspb.ChangefeedTargetSpecification, -) *rowFetcherCache { +) (*rowFetcherCache, error) { + if len(specs) == 0 { + return nil, errors.AssertionFailedf("Expected at least one spec, found 0") + } watchedFamilies := make(map[watchedFamily]struct{}, len(specs)) for _, s := range specs { watchedFamilies[watchedFamily{tableID: s.TableID, familyName: s.FamilyName}] = struct{}{} @@ -92,7 +95,7 @@ func newRowFetcherCache( db: db, fetchers: cache.NewUnorderedCache(defaultCacheConfig), watchedFamilies: watchedFamilies, - } + }, nil } func refreshUDT( diff --git a/pkg/ccl/changefeedccl/changefeed_processors.go b/pkg/ccl/changefeedccl/changefeed_processors.go index c28d8c5fcc1b..ab8879cade1c 100644 --- a/pkg/ccl/changefeedccl/changefeed_processors.go +++ b/pkg/ccl/changefeedccl/changefeed_processors.go @@ -168,7 +168,7 @@ func newChangeAggregatorProcessor( } if _, needTopics := ca.spec.Feed.Opts[changefeedbase.OptTopicInValue]; needTopics { - ca.topicNamer, err = MakeTopicNamer(ca.spec.Feed.TargetSpecifications) + ca.topicNamer, err = MakeTopicNamer(AllTargets(ca.spec.Feed)) if err != nil { return nil, err } @@ -297,9 +297,16 @@ func (ca *changeAggregator) Start(ctx context.Context) { return } - ca.eventConsumer = newKVEventToRowConsumer( + ca.eventConsumer, err = newKVEventToRowConsumer( ctx, ca.flowCtx.Cfg, ca.frontier.SpanFrontier(), kvFeedHighWater, ca.sink, ca.encoder, ca.spec.Feed, ca.knobs, ca.topicNamer) + + if err != nil { + // Early abort in the case that there is an error setting up the consumption. + ca.MoveToDraining(err) + ca.cancel() + return + } } func (ca *changeAggregator) startKVFeed( diff --git a/pkg/ccl/changefeedccl/changefeed_stmt.go b/pkg/ccl/changefeedccl/changefeed_stmt.go index 193630e75884..d14042ce636b 100644 --- a/pkg/ccl/changefeedccl/changefeed_stmt.go +++ b/pkg/ccl/changefeedccl/changefeed_stmt.go @@ -1208,7 +1208,9 @@ func AllTargets(cd jobspb.ChangefeedDetails) (targets []jobspb.ChangefeedTargetS if len(cd.TargetSpecifications) > 0 { for _, ts := range cd.TargetSpecifications { if ts.TableID > 0 { - ts.StatementTimeName = cd.Tables[ts.TableID].StatementTimeName + if ts.StatementTimeName == "" { + ts.StatementTimeName = cd.Tables[ts.TableID].StatementTimeName + } targets = append(targets, ts) } } diff --git a/pkg/ccl/changefeedccl/event_processing.go b/pkg/ccl/changefeedccl/event_processing.go index f67b6ea43d64..86ace1b844df 100644 --- a/pkg/ccl/changefeedccl/event_processing.go +++ b/pkg/ccl/changefeedccl/event_processing.go @@ -55,18 +55,23 @@ func newKVEventToRowConsumer( details jobspb.ChangefeedDetails, knobs TestingKnobs, topicNamer *TopicNamer, -) *kvEventToRowConsumer { +) (*kvEventToRowConsumer, error) { + includeVirtual := details.Opts[changefeedbase.OptVirtualColumns] == string(changefeedbase.OptVirtualColumnsNull) + decoder, err := cdcevent.NewEventDecoder(ctx, cfg, AllTargets(details), includeVirtual) + if err != nil { + return nil, err + } return &kvEventToRowConsumer{ frontier: frontier, encoder: encoder, - decoder: cdcevent.NewEventDecoder(ctx, cfg, details), + decoder: decoder, sink: sink, cursor: cursor, details: details, knobs: knobs, topicDescriptorCache: make(map[TopicIdentifier]TopicDescriptor), topicNamer: topicNamer, - } + }, nil } func (c *kvEventToRowConsumer) topicForEvent(eventMeta cdcevent.Metadata) (TopicDescriptor, error) { @@ -75,7 +80,7 @@ func (c *kvEventToRowConsumer) topicForEvent(eventMeta cdcevent.Metadata) (Topic return topic, nil } } - for _, s := range c.details.TargetSpecifications { + for _, s := range AllTargets(c.details) { if s.TableID == eventMeta.TableID && (s.FamilyName == "" || s.FamilyName == eventMeta.FamilyName) { topic, err := makeTopicDescriptorFromSpec(s, eventMeta) if err != nil { diff --git a/pkg/ccl/changefeedccl/event_processing_test.go b/pkg/ccl/changefeedccl/event_processing_test.go new file mode 100644 index 000000000000..d24ee992ff93 --- /dev/null +++ b/pkg/ccl/changefeedccl/event_processing_test.go @@ -0,0 +1,168 @@ +// Copyright 2022 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package changefeedccl + +import ( + "testing" + + "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent" + "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/stretchr/testify/require" +) + +func TestTopicForEvent(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + // Test verifies that topic naming works for various combinations of table/families. + // Versions prior to 22.1 did not initialize TargetSpecifications, and used Tables instead. + // Both flavors are tested -- with "old proto" tests testing topic naming with "Tables" field. + for _, tc := range []struct { + name string + details jobspb.ChangefeedDetails + event cdcevent.Metadata + expectErr string + topicName string + }{ + { + name: "old proto", + details: jobspb.ChangefeedDetails{ + Tables: jobspb.ChangefeedTargets{1: jobspb.ChangefeedTargetTable{StatementTimeName: "t1"}}, + }, + event: cdcevent.Metadata{TableID: 1, TableName: "t1"}, + topicName: "t1", + }, + { + name: "old proto no such topic", + details: jobspb.ChangefeedDetails{ + Tables: jobspb.ChangefeedTargets{ + 1: jobspb.ChangefeedTargetTable{StatementTimeName: "t1"}, + 2: jobspb.ChangefeedTargetTable{StatementTimeName: "t2"}, + }, + }, + event: cdcevent.Metadata{TableID: 3}, + expectErr: "no TargetSpecification for row", + }, + { + name: "old proto table renamed", + details: jobspb.ChangefeedDetails{ + Tables: jobspb.ChangefeedTargets{1: jobspb.ChangefeedTargetTable{StatementTimeName: "old_name"}}, + }, + event: cdcevent.Metadata{TableID: 1, TableName: "new_name"}, + topicName: "old_name", + }, + { + name: "old proto family ignored", + details: jobspb.ChangefeedDetails{ + Tables: jobspb.ChangefeedTargets{1: jobspb.ChangefeedTargetTable{StatementTimeName: "t1"}}, + }, + event: cdcevent.Metadata{TableID: 1, TableName: "t1", FamilyID: 2, FamilyName: "fam"}, + topicName: "t1", + }, + { + name: "full table", + details: jobspb.ChangefeedDetails{ + TargetSpecifications: []jobspb.ChangefeedTargetSpecification{ + { + TableID: 1, + StatementTimeName: "t1", + }, + }, + }, + event: cdcevent.Metadata{TableID: 1, TableName: "t1"}, + topicName: "t1", + }, + { + name: "full table renamed", + details: jobspb.ChangefeedDetails{ + TargetSpecifications: []jobspb.ChangefeedTargetSpecification{ + { + TableID: 1, + StatementTimeName: "old_name", + }, + }, + }, + event: cdcevent.Metadata{TableID: 1, TableName: "new_name"}, + topicName: "old_name", + }, + { + name: "single family", + details: jobspb.ChangefeedDetails{ + TargetSpecifications: []jobspb.ChangefeedTargetSpecification{ + { + Type: jobspb.ChangefeedTargetSpecification_COLUMN_FAMILY, + TableID: 1, + FamilyName: "fam", + StatementTimeName: "t1", + }, + }, + }, + event: cdcevent.Metadata{TableID: 1, TableName: "new_name", FamilyName: "fam", FamilyID: 1}, + topicName: "t1.fam", + }, + { + name: "each family", + details: jobspb.ChangefeedDetails{ + TargetSpecifications: []jobspb.ChangefeedTargetSpecification{ + { + Type: jobspb.ChangefeedTargetSpecification_EACH_FAMILY, + TableID: 1, + FamilyName: "fam", + StatementTimeName: "old_name", + }, + { + Type: jobspb.ChangefeedTargetSpecification_EACH_FAMILY, + TableID: 1, + FamilyName: "fam2", + StatementTimeName: "old_name", + }, + }, + }, + event: cdcevent.Metadata{TableID: 1, TableName: "new_name", FamilyName: "fam2", FamilyID: 2}, + topicName: "old_name.fam2", + }, + { + name: "wrong family", + details: jobspb.ChangefeedDetails{ + TargetSpecifications: []jobspb.ChangefeedTargetSpecification{ + { + Type: jobspb.ChangefeedTargetSpecification_COLUMN_FAMILY, + TableID: 1, + FamilyName: "fam", + StatementTimeName: "t1", + }, + }, + }, + event: cdcevent.Metadata{TableID: 1, TableName: "new_name", FamilyName: "wrong", FamilyID: 0}, + expectErr: "no TargetSpecification for row", + }, + } { + t.Run(tc.name, func(t *testing.T) { + c := kvEventToRowConsumer{ + details: tc.details, + topicDescriptorCache: make(map[TopicIdentifier]TopicDescriptor), + } + tn, err := MakeTopicNamer(AllTargets(tc.details)) + require.NoError(t, err) + + td, err := c.topicForEvent(tc.event) + if tc.expectErr == "" { + require.NoError(t, err) + topicName, err := tn.Name(td) + require.NoError(t, err) + require.Equal(t, tc.topicName, topicName) + } else { + require.Regexp(t, tc.expectErr, err) + require.Equal(t, noTopic{}, td) + } + }) + } +}