diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index 47211ec35631..85fae2a501f7 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -996,6 +996,195 @@ func TestChangefeedUserDefinedTypes(t *testing.T) { cdcTest(t, testFn) } +// If the schema_change_policy is 'stop' and we drop columns which are not +// targeted by the changefeed, it should not stop. +func TestNoStopAfterNonTargetColumnDrop(t *testing.T) { + defer leaktest.AfterTest(t)() + testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { + sqlDB := sqlutils.MakeSQLRunner(s.DB) + + sqlDB.Exec(t, `CREATE TABLE hasfams (id int primary key, a string, b string, c string, FAMILY id_a (id, a), FAMILY b_and_c (b, c))`) + sqlDB.Exec(t, `INSERT INTO hasfams values (0, 'a', 'b', 'c')`) + + // Open up the changefeed. + cf := feed(t, f, `CREATE CHANGEFEED FOR TABLE hasfams FAMILY b_and_c WITH schema_change_policy='stop'`) + defer closeFeed(t, cf) + assertPayloads(t, cf, []string{ + `hasfams.b_and_c: [0]->{"after": {"b": "b", "c": "c"}}`, + }) + + sqlDB.Exec(t, `ALTER TABLE hasfams DROP COLUMN a`) + sqlDB.Exec(t, `INSERT INTO hasfams VALUES (1, 'b1', 'c1')`) + + assertPayloads(t, cf, []string{ + `hasfams.b_and_c: [1]->{"after": {"b": "b1", "c": "c1"}}`, + }) + + // Check that dropping a watched column still stops the changefeed. + sqlDB.Exec(t, `ALTER TABLE hasfams DROP COLUMN b`) + if _, err := cf.Next(); !testutils.IsError(err, `schema change occurred at`) { + t.Errorf(`expected "schema change occurred at ..." got: %+v`, err.Error()) + } + } + + cdcTest(t, testFn, feedTestOmitSinks("sinkless")) +} + +// If we drop columns which are not targeted by the changefeed, it should not backfill. +func TestNoBackfillAfterNonTargetColumnDrop(t *testing.T) { + defer leaktest.AfterTest(t)() + testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { + sqlDB := sqlutils.MakeSQLRunner(s.DB) + + sqlDB.Exec(t, `CREATE TABLE hasfams (id int primary key, a string, b string, c string, FAMILY id_a (id, a), FAMILY b_and_c (b, c))`) + sqlDB.Exec(t, `INSERT INTO hasfams values (0, 'a', 'b', 'c')`) + + // Open up the changefeed. + cf := feed(t, f, `CREATE CHANGEFEED FOR TABLE hasfams FAMILY b_and_c`) + defer closeFeed(t, cf) + assertPayloads(t, cf, []string{ + `hasfams.b_and_c: [0]->{"after": {"b": "b", "c": "c"}}`, + }) + + sqlDB.Exec(t, `ALTER TABLE hasfams DROP COLUMN a`) + sqlDB.Exec(t, `INSERT INTO hasfams VALUES (1, 'b1', 'c1')`) + assertPayloads(t, cf, []string{ + `hasfams.b_and_c: [1]->{"after": {"b": "b1", "c": "c1"}}`, + }) + + // Check that dropping a watched column still backfills. + sqlDB.Exec(t, `ALTER TABLE hasfams DROP COLUMN c`) + assertPayloads(t, cf, []string{ + `hasfams.b_and_c: [0]->{"after": {"b": "b"}}`, + `hasfams.b_and_c: [1]->{"after": {"b": "b1"}}`, + }) + } + + cdcTest(t, testFn, feedTestOmitSinks("sinkless")) +} + +func TestChangefeedColumnDropsWithFamilyAndNonFamilyTargets(t *testing.T) { + defer leaktest.AfterTest(t)() + testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { + sqlDB := sqlutils.MakeSQLRunner(s.DB) + + sqlDB.Exec(t, `CREATE TABLE hasfams (id int primary key, a string, b string, c string, FAMILY id_a (id, a), FAMILY b_and_c (b, c))`) + sqlDB.Exec(t, `CREATE TABLE nofams (id int primary key, a string, b string, c string)`) + sqlDB.Exec(t, `INSERT INTO hasfams values (0, 'a', 'b', 'c')`) + sqlDB.Exec(t, `INSERT INTO nofams values (0, 'a', 'b', 'c')`) + + // Open up the changefeed. + cf := feed(t, f, `CREATE CHANGEFEED FOR TABLE hasfams FAMILY b_and_c, TABLE nofams`) + defer closeFeed(t, cf) + assertPayloads(t, cf, []string{ + `hasfams.b_and_c: [0]->{"after": {"b": "b", "c": "c"}}`, + `nofams: [0]->{"after": {"a": "a", "b": "b", "c": "c", "id": 0}}`, + }) + + // Dropping an unwatched column from hasfams does not affect the changefeed. + sqlDB.Exec(t, `ALTER TABLE hasfams DROP COLUMN a`) + sqlDB.Exec(t, `INSERT INTO hasfams VALUES (1, 'b1', 'c1')`) + sqlDB.Exec(t, `INSERT INTO nofams VALUES (1, 'a1', 'b1', 'c1')`) + assertPayloads(t, cf, []string{ + `hasfams.b_and_c: [1]->{"after": {"b": "b1", "c": "c1"}}`, + `nofams: [1]->{"after": {"a": "a1", "b": "b1", "c": "c1", "id": 1}}`, + }) + + // Check that dropping a watched column will backfill the changefeed. + sqlDB.Exec(t, `ALTER TABLE hasfams DROP COLUMN b`) + assertPayloads(t, cf, []string{ + `hasfams.b_and_c: [0]->{"after": {"c": "c"}}`, + `hasfams.b_and_c: [1]->{"after": {"c": "c1"}}`, + }) + + // Check that dropping a watched column will backfill the changefeed. + sqlDB.Exec(t, `ALTER TABLE nofams DROP COLUMN b`) + assertPayloads(t, cf, []string{ + `nofams: [0]->{"after": {"a": "a", "c": "c", "id": 0}}`, + `nofams: [1]->{"after": {"a": "a1", "c": "c1", "id": 1}}`, + }) + } + + cdcTest(t, testFn, feedTestOmitSinks("sinkless")) +} + +func TestChangefeedColumnDropsOnMultipleFamiliesWithTheSameName(t *testing.T) { + defer leaktest.AfterTest(t)() + testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { + sqlDB := sqlutils.MakeSQLRunner(s.DB) + + sqlDB.Exec(t, `CREATE TABLE hasfams (id int primary key, a string, b string, c string, FAMILY id_a (id, a), FAMILY b_and_c (b, c))`) + sqlDB.Exec(t, `CREATE TABLE alsohasfams (id int primary key, a string, b string, c string, FAMILY id_a (id, a), FAMILY b_and_c (b, c))`) + sqlDB.Exec(t, `INSERT INTO hasfams values (0, 'a', 'b', 'c')`) + sqlDB.Exec(t, `INSERT INTO alsohasfams values (0, 'a', 'b', 'c')`) + + // Open up the changefeed. + cf := feed(t, f, `CREATE CHANGEFEED FOR TABLE hasfams FAMILY b_and_c, TABLE alsohasfams FAMILY id_a`) + defer closeFeed(t, cf) + assertPayloads(t, cf, []string{ + `hasfams.b_and_c: [0]->{"after": {"b": "b", "c": "c"}}`, + `alsohasfams.id_a: [0]->{"after": {"a": "a", "id": 0}}`, + }) + + // Dropping an unwatched column from hasfams does not affect the changefeed. + sqlDB.Exec(t, `ALTER TABLE hasfams DROP COLUMN a`) + sqlDB.Exec(t, `INSERT INTO hasfams VALUES (1, 'b1', 'c1')`) + sqlDB.Exec(t, `INSERT INTO alsohasfams VALUES (1, 'a1', 'b1', 'c1')`) + assertPayloads(t, cf, []string{ + `hasfams.b_and_c: [1]->{"after": {"b": "b1", "c": "c1"}}`, + `alsohasfams.id_a: [1]->{"after": {"a": "a1", "id": 1}}`, + }) + + // Check that dropping a watched column will backfill the changefeed. + sqlDB.Exec(t, `ALTER TABLE alsohasfams DROP COLUMN a`) + assertPayloads(t, cf, []string{ + `alsohasfams.id_a: [0]->{"after": {"id": 0}}`, + `alsohasfams.id_a: [1]->{"after": {"id": 1}}`, + }) + + // Check that dropping a watched column will backfill the changefeed. + sqlDB.Exec(t, `ALTER TABLE hasfams DROP COLUMN b`) + assertPayloads(t, cf, []string{ + `hasfams.b_and_c: [0]->{"after": {"c": "c"}}`, + `hasfams.b_and_c: [1]->{"after": {"c": "c1"}}`, + }) + } + + cdcTest(t, testFn, feedTestOmitSinks("sinkless")) +} + +func TestChangefeedColumnDropsOnTheSameTableWithMultipleFamilies(t *testing.T) { + defer leaktest.AfterTest(t)() + testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { + sqlDB := sqlutils.MakeSQLRunner(s.DB) + + sqlDB.Exec(t, `CREATE TABLE hasfams (id int primary key, a string, b string, c string, FAMILY id_a (id, a), FAMILY b_and_c (b, c))`) + sqlDB.Exec(t, `INSERT INTO hasfams values (0, 'a', 'b', 'c')`) + + // Open up the changefeed. + cf := feed(t, f, `CREATE CHANGEFEED FOR TABLE hasfams FAMILY id_a, TABLE hasfams FAMILY b_and_c`) + defer closeFeed(t, cf) + assertPayloads(t, cf, []string{ + `hasfams.b_and_c: [0]->{"after": {"b": "b", "c": "c"}}`, + `hasfams.id_a: [0]->{"after": {"a": "a", "id": 0}}`, + }) + + // Check that dropping a watched column will backfill the changefeed. + sqlDB.Exec(t, `ALTER TABLE hasfams DROP COLUMN a`) + assertPayloads(t, cf, []string{ + `hasfams.id_a: [0]->{"after": {"id": 0}}`, + }) + + // Check that dropping a watched column will backfill the changefeed. + sqlDB.Exec(t, `ALTER TABLE hasfams DROP COLUMN b`) + assertPayloads(t, cf, []string{ + `hasfams.b_and_c: [0]->{"after": {"c": "c"}}`, + }) + } + + cdcTest(t, testFn, feedTestOmitSinks("sinkless")) +} + func TestChangefeedExternalIODisabled(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) diff --git a/pkg/ccl/changefeedccl/changefeedbase/target.go b/pkg/ccl/changefeedccl/changefeedbase/target.go index 0e6591f3859b..22bd8dfd5b2e 100644 --- a/pkg/ccl/changefeedccl/changefeedbase/target.go +++ b/pkg/ccl/changefeedccl/changefeedbase/target.go @@ -26,7 +26,11 @@ type Target struct { type StatementTimeName string type targetsByTable struct { - wholeTable *Target + // wholeTable is a single target set when the target has no specified column + // families + wholeTable *Target + // byFamilyName is populated only if there are multiple column family targets + // for the table byFamilyName map[string]Target } @@ -83,6 +87,21 @@ func (ts *Targets) EachTarget(f func(Target) error) error { return nil } +// GetSpecifiedColumnFamilies returns a set of watched families +// belonging to the table. +func (ts *Targets) GetSpecifiedColumnFamilies(tableID descpb.ID) map[string]*struct{} { + target, exists := ts.m[tableID] + if !exists { + return make(map[string]*struct{}) + } + + families := make(map[string]*struct{}, len(target.byFamilyName)) + for family := range target.byFamilyName { + families[family] = nil + } + return families +} + // EachTableID iterates over unique TableIDs referenced in Targets. func (ts *Targets) EachTableID(f func(descpb.ID) error) error { for id := range ts.m { diff --git a/pkg/ccl/changefeedccl/kvfeed/kv_feed.go b/pkg/ccl/changefeedccl/kvfeed/kv_feed.go index 3d4ed2ebcd05..ca5a3b1b88f1 100644 --- a/pkg/ccl/changefeedccl/kvfeed/kv_feed.go +++ b/pkg/ccl/changefeedccl/kvfeed/kv_feed.go @@ -287,11 +287,18 @@ func (f *kvFeed) run(ctx context.Context) (err error) { if err != nil { return err } + // Detect whether the event corresponds to a primary index change. Also - // detect whether that primary index change corresponds to any change in - // the primary key or in the set of visible columns. If it corresponds to - // no such change, than it may be a column being dropped physically and - // should not trigger a failure in the `stop` policy. + // detect whether the change corresponds to any change in the set of visible + // primary key columns. + // + // If a primary key is being changed and there are no changes in the + // primary key's columns, this may be due to a column which was dropped + // logically before and is presently being physically dropped. + // + // If is no change in the primary key columns, then a primary key change + // should not trigger a failure in the `stop` policy because this change is + // effectively invisible to consumers. primaryIndexChange, noColumnChanges := isPrimaryKeyChange(events) if primaryIndexChange && (noColumnChanges || f.schemaChangePolicy != changefeedbase.OptSchemaChangePolicyStop) { diff --git a/pkg/ccl/changefeedccl/schemafeed/BUILD.bazel b/pkg/ccl/changefeedccl/schemafeed/BUILD.bazel index a01b601a4ed0..99f0bc4552cb 100644 --- a/pkg/ccl/changefeedccl/schemafeed/BUILD.bazel +++ b/pkg/ccl/changefeedccl/schemafeed/BUILD.bazel @@ -32,6 +32,7 @@ go_library( "//pkg/sql/sessiondata", "//pkg/sql/sqlutil", "//pkg/storage", + "//pkg/util", "//pkg/util/contextutil", "//pkg/util/encoding", "//pkg/util/hlc", diff --git a/pkg/ccl/changefeedccl/schemafeed/helpers_test.go b/pkg/ccl/changefeedccl/schemafeed/helpers_test.go index 91f61db019a1..7f1f045a3f84 100644 --- a/pkg/ccl/changefeedccl/schemafeed/helpers_test.go +++ b/pkg/ccl/changefeedccl/schemafeed/helpers_test.go @@ -8,7 +8,12 @@ package schemafeed -import "strings" +import ( + "strings" + + "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" +) const TestingAllEventFilter = "testing" @@ -35,3 +40,9 @@ func PrintTableEventType(t tableEventType) string { } return strings.Join(strs, "|") } + +func CreateChangefeedTargets(tableID descpb.ID) changefeedbase.Targets { + targets := changefeedbase.Targets{} + targets.Add(changefeedbase.Target{TableID: tableID}) + return targets +} diff --git a/pkg/ccl/changefeedccl/schemafeed/schema_feed.go b/pkg/ccl/changefeedccl/schemafeed/schema_feed.go index 9437f9935da4..ef01d8087116 100644 --- a/pkg/ccl/changefeedccl/schemafeed/schema_feed.go +++ b/pkg/ccl/changefeedccl/schemafeed/schema_feed.go @@ -560,7 +560,7 @@ func (tf *schemaFeed) validateDescriptor( Before: lastVersion, After: desc, } - shouldFilter, err := tf.filter.shouldFilter(ctx, e) + shouldFilter, err := tf.filter.shouldFilter(ctx, e, tf.targets) log.VEventf(ctx, 1, "validate shouldFilter %v %v", formatEvent(e), shouldFilter) if err != nil { return err diff --git a/pkg/ccl/changefeedccl/schemafeed/table_event_filter.go b/pkg/ccl/changefeedccl/schemafeed/table_event_filter.go index b7f3c519e387..38a5c5e2e746 100644 --- a/pkg/ccl/changefeedccl/schemafeed/table_event_filter.go +++ b/pkg/ccl/changefeedccl/schemafeed/table_event_filter.go @@ -14,7 +14,9 @@ import ( "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" "github.com/cockroachdb/cockroach/pkg/sql/catalog" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scpb" + "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/errors" ) @@ -122,7 +124,9 @@ func classifyTableEvent(e TableEvent) tableEventTypeSet { // permitted by the filter. type tableEventFilter map[tableEventType]bool -func (filter tableEventFilter) shouldFilter(ctx context.Context, e TableEvent) (bool, error) { +func (filter tableEventFilter) shouldFilter( + ctx context.Context, e TableEvent, targets changefeedbase.Targets, +) (bool, error) { et := classifyTableEvent(e) // Truncation events are not ignored and return an error. @@ -141,7 +145,19 @@ func (filter tableEventFilter) shouldFilter(ctx context.Context, e TableEvent) ( shouldFilter := true for filterEvent, filterPolicy := range filter { if et.Contains(filterEvent) && !filterPolicy { - shouldFilter = false + // Apply changefeed target-specific filters. + // In some cases, a drop column event should be filtered out. + // For example, we may be dropping a column which is not + // monitored by the changefeed. + if filterEvent == tableEventDropColumn { + sf, err := shouldFilterDropColumnEvent(e, targets) + if err != nil { + return false, err + } + shouldFilter = sf + } else { + shouldFilter = false + } } et = et.Clear(filterEvent) } @@ -151,6 +167,48 @@ func (filter tableEventFilter) shouldFilter(ctx context.Context, e TableEvent) ( return shouldFilter, nil } +// shouldFilterDropColumnEvent decides if we should filter out a drop column event. +func shouldFilterDropColumnEvent(e TableEvent, targets changefeedbase.Targets) (bool, error) { + if watched, err := droppedColumnIsWatched(e, targets); err != nil { + return false, err + } else if watched { + return false, nil + } + return true, nil +} + +// Returns true if the changefeed targets a column which has a drop mutation inside the table event. +func droppedColumnIsWatched(e TableEvent, targets changefeedbase.Targets) (bool, error) { + // If no column families are specified, then all columns are targeted. + specifiedColumnFamiliesForTable := targets.GetSpecifiedColumnFamilies(e.Before.GetID()) + if len(specifiedColumnFamiliesForTable) == 0 { + return true, nil + } + + var watchedColumnIDs util.FastIntSet + if err := e.Before.ForeachFamily(func(family *descpb.ColumnFamilyDescriptor) error { + if _, ok := specifiedColumnFamiliesForTable[family.Name]; ok { + for _, columnID := range family.ColumnIDs { + watchedColumnIDs.Add(int(columnID)) + } + } + return nil + }); err != nil { + return false, err + } + + for _, m := range e.After.AllMutations() { + if m.AsColumn() == nil || m.AsColumn().IsHidden() { + continue + } + if m.Dropped() && m.WriteAndDeleteOnly() && watchedColumnIDs.Contains(int(m.AsColumn().GetID())) { + return true, nil + } + } + + return false, nil +} + func hasNewVisibleColumnDropBackfillMutation(e TableEvent) (res bool) { // Make sure that the old descriptor *doesn't* have the same mutation to avoid adding // the same scan boundary more than once. diff --git a/pkg/ccl/changefeedccl/schemafeed/table_event_filter_test.go b/pkg/ccl/changefeedccl/schemafeed/table_event_filter_test.go index c6bb55ab805b..3061af4508d8 100644 --- a/pkg/ccl/changefeedccl/schemafeed/table_event_filter_test.go +++ b/pkg/ccl/changefeedccl/schemafeed/table_event_filter_test.go @@ -254,15 +254,18 @@ func TestTableEventFilterErrorsWithIncompletePolicy(t *testing.T) { Before: mkTableDesc(42, 1, ts(2), 2, 1), After: dropColBackfill(mkTableDesc(42, 2, ts(3), 1, 1)), } - _, err := incompleteFilter.shouldFilter(context.Background(), dropColEvent) + changefeedTargets := CreateChangefeedTargets(42) + + _, err := incompleteFilter.shouldFilter(context.Background(), dropColEvent, changefeedTargets) require.Error(t, err) unknownEvent := TableEvent{ Before: mkTableDesc(42, 1, ts(2), 2, 1), After: mkTableDesc(42, 1, ts(2), 2, 1), } - _, err = incompleteFilter.shouldFilter(context.Background(), unknownEvent) + _, err = incompleteFilter.shouldFilter(context.Background(), unknownEvent, changefeedTargets) require.Error(t, err) + } func TestTableEventFilter(t *testing.T) { @@ -388,7 +391,7 @@ func TestTableEventFilter(t *testing.T) { }, } { t.Run(c.name, func(t *testing.T) { - shouldFilter, err := c.p.shouldFilter(context.Background(), c.e) + shouldFilter, err := c.p.shouldFilter(context.Background(), c.e, CreateChangefeedTargets(42)) require.NoError(t, err) require.Equalf(t, c.exp, shouldFilter, "event %v", c.e) })