Skip to content

Commit

Permalink
changefeedccl/schemafeed: ignore unwatched column drops
Browse files Browse the repository at this point in the history
See release note.

Fixes cockroachdb#80982

Release note (enterprise change): Previously, if you dropped
a column with the schema_change_policy 'stop', the changefeed
would stop. Dropping a column with a different policy would
result in previous rows being retransmitted with the
dropped column omitted.

In some cases, a changefeed may target specific columns
(a column family) of a table. In these cases, if a non-target
column is dropped, it does not make sense to stop the changefeed
or retransmit values because the column was not visible to
a consumer sink to begin with.

With this change, dropping an non-target column from a
table will not stop the changefeed when the
schema_change_policy is 'stop'. With any other policy,
dropping a non-target column will not trigger a backfill.
  • Loading branch information
jayshrivastava committed Jul 22, 2022
1 parent fd335c7 commit c6f667c
Show file tree
Hide file tree
Showing 8 changed files with 300 additions and 12 deletions.
189 changes: 189 additions & 0 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -996,6 +996,195 @@ func TestChangefeedUserDefinedTypes(t *testing.T) {
cdcTest(t, testFn)
}

// If the schema_change_policy is 'stop' and we drop columns which are not
// targeted by the changefeed, it should not stop.
func TestNoStopAfterNonTargetColumnDrop(t *testing.T) {
defer leaktest.AfterTest(t)()
testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(s.DB)

sqlDB.Exec(t, `CREATE TABLE hasfams (id int primary key, a string, b string, c string, FAMILY id_a (id, a), FAMILY b_and_c (b, c))`)
sqlDB.Exec(t, `INSERT INTO hasfams values (0, 'a', 'b', 'c')`)

// Open up the changefeed.
cf := feed(t, f, `CREATE CHANGEFEED FOR TABLE hasfams FAMILY b_and_c WITH schema_change_policy='stop'`)
defer closeFeed(t, cf)
assertPayloads(t, cf, []string{
`hasfams.b_and_c: [0]->{"after": {"b": "b", "c": "c"}}`,
})

sqlDB.Exec(t, `ALTER TABLE hasfams DROP COLUMN a`)
sqlDB.Exec(t, `INSERT INTO hasfams VALUES (1, 'b1', 'c1')`)

assertPayloads(t, cf, []string{
`hasfams.b_and_c: [1]->{"after": {"b": "b1", "c": "c1"}}`,
})

// Check that dropping a watched column still stops the changefeed.
sqlDB.Exec(t, `ALTER TABLE hasfams DROP COLUMN b`)
if _, err := cf.Next(); !testutils.IsError(err, `schema change occurred at`) {
t.Errorf(`expected "schema change occurred at ..." got: %+v`, err.Error())
}
}

cdcTest(t, testFn, feedTestOmitSinks("sinkless"))
}

// If we drop columns which are not targeted by the changefeed, it should not backfill.
func TestNoBackfillAfterNonTargetColumnDrop(t *testing.T) {
defer leaktest.AfterTest(t)()
testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(s.DB)

sqlDB.Exec(t, `CREATE TABLE hasfams (id int primary key, a string, b string, c string, FAMILY id_a (id, a), FAMILY b_and_c (b, c))`)
sqlDB.Exec(t, `INSERT INTO hasfams values (0, 'a', 'b', 'c')`)

// Open up the changefeed.
cf := feed(t, f, `CREATE CHANGEFEED FOR TABLE hasfams FAMILY b_and_c`)
defer closeFeed(t, cf)
assertPayloads(t, cf, []string{
`hasfams.b_and_c: [0]->{"after": {"b": "b", "c": "c"}}`,
})

sqlDB.Exec(t, `ALTER TABLE hasfams DROP COLUMN a`)
sqlDB.Exec(t, `INSERT INTO hasfams VALUES (1, 'b1', 'c1')`)
assertPayloads(t, cf, []string{
`hasfams.b_and_c: [1]->{"after": {"b": "b1", "c": "c1"}}`,
})

// Check that dropping a watched column still backfills.
sqlDB.Exec(t, `ALTER TABLE hasfams DROP COLUMN c`)
assertPayloads(t, cf, []string{
`hasfams.b_and_c: [0]->{"after": {"b": "b"}}`,
`hasfams.b_and_c: [1]->{"after": {"b": "b1"}}`,
})
}

cdcTest(t, testFn, feedTestOmitSinks("sinkless"))
}

func TestChangefeedColumnDropsWithFamilyAndNonFamilyTargets(t *testing.T) {
defer leaktest.AfterTest(t)()
testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(s.DB)

sqlDB.Exec(t, `CREATE TABLE hasfams (id int primary key, a string, b string, c string, FAMILY id_a (id, a), FAMILY b_and_c (b, c))`)
sqlDB.Exec(t, `CREATE TABLE nofams (id int primary key, a string, b string, c string)`)
sqlDB.Exec(t, `INSERT INTO hasfams values (0, 'a', 'b', 'c')`)
sqlDB.Exec(t, `INSERT INTO nofams values (0, 'a', 'b', 'c')`)

// Open up the changefeed.
cf := feed(t, f, `CREATE CHANGEFEED FOR TABLE hasfams FAMILY b_and_c, TABLE nofams`)
defer closeFeed(t, cf)
assertPayloads(t, cf, []string{
`hasfams.b_and_c: [0]->{"after": {"b": "b", "c": "c"}}`,
`nofams: [0]->{"after": {"a": "a", "b": "b", "c": "c", "id": 0}}`,
})

// Dropping an unwatched column from hasfams does not affect the changefeed.
sqlDB.Exec(t, `ALTER TABLE hasfams DROP COLUMN a`)
sqlDB.Exec(t, `INSERT INTO hasfams VALUES (1, 'b1', 'c1')`)
sqlDB.Exec(t, `INSERT INTO nofams VALUES (1, 'a1', 'b1', 'c1')`)
assertPayloads(t, cf, []string{
`hasfams.b_and_c: [1]->{"after": {"b": "b1", "c": "c1"}}`,
`nofams: [1]->{"after": {"a": "a1", "b": "b1", "c": "c1", "id": 1}}`,
})

// Check that dropping a watched column will backfill the changefeed.
sqlDB.Exec(t, `ALTER TABLE hasfams DROP COLUMN b`)
assertPayloads(t, cf, []string{
`hasfams.b_and_c: [0]->{"after": {"c": "c"}}`,
`hasfams.b_and_c: [1]->{"after": {"c": "c1"}}`,
})

// Check that dropping a watched column will backfill the changefeed.
sqlDB.Exec(t, `ALTER TABLE nofams DROP COLUMN b`)
assertPayloads(t, cf, []string{
`nofams: [0]->{"after": {"a": "a", "c": "c", "id": 0}}`,
`nofams: [1]->{"after": {"a": "a1", "c": "c1", "id": 1}}`,
})
}

cdcTest(t, testFn, feedTestOmitSinks("sinkless"))
}

func TestChangefeedColumnDropsOnMultipleFamiliesWithTheSameName(t *testing.T) {
defer leaktest.AfterTest(t)()
testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(s.DB)

sqlDB.Exec(t, `CREATE TABLE hasfams (id int primary key, a string, b string, c string, FAMILY id_a (id, a), FAMILY b_and_c (b, c))`)
sqlDB.Exec(t, `CREATE TABLE alsohasfams (id int primary key, a string, b string, c string, FAMILY id_a (id, a), FAMILY b_and_c (b, c))`)
sqlDB.Exec(t, `INSERT INTO hasfams values (0, 'a', 'b', 'c')`)
sqlDB.Exec(t, `INSERT INTO alsohasfams values (0, 'a', 'b', 'c')`)

// Open up the changefeed.
cf := feed(t, f, `CREATE CHANGEFEED FOR TABLE hasfams FAMILY b_and_c, TABLE alsohasfams FAMILY id_a`)
defer closeFeed(t, cf)
assertPayloads(t, cf, []string{
`hasfams.b_and_c: [0]->{"after": {"b": "b", "c": "c"}}`,
`alsohasfams.id_a: [0]->{"after": {"a": "a", "id": 0}}`,
})

// Dropping an unwatched column from hasfams does not affect the changefeed.
sqlDB.Exec(t, `ALTER TABLE hasfams DROP COLUMN a`)
sqlDB.Exec(t, `INSERT INTO hasfams VALUES (1, 'b1', 'c1')`)
sqlDB.Exec(t, `INSERT INTO alsohasfams VALUES (1, 'a1', 'b1', 'c1')`)
assertPayloads(t, cf, []string{
`hasfams.b_and_c: [1]->{"after": {"b": "b1", "c": "c1"}}`,
`alsohasfams.id_a: [1]->{"after": {"a": "a1", "id": 1}}`,
})

// Check that dropping a watched column will backfill the changefeed.
sqlDB.Exec(t, `ALTER TABLE alsohasfams DROP COLUMN a`)
assertPayloads(t, cf, []string{
`alsohasfams.id_a: [0]->{"after": {"id": 0}}`,
`alsohasfams.id_a: [1]->{"after": {"id": 1}}`,
})

// Check that dropping a watched column will backfill the changefeed.
sqlDB.Exec(t, `ALTER TABLE hasfams DROP COLUMN b`)
assertPayloads(t, cf, []string{
`hasfams.b_and_c: [0]->{"after": {"c": "c"}}`,
`hasfams.b_and_c: [1]->{"after": {"c": "c1"}}`,
})
}

cdcTest(t, testFn, feedTestOmitSinks("sinkless"))
}

func TestChangefeedColumnDropsOnTheSameTableWithMultipleFamilies(t *testing.T) {
defer leaktest.AfterTest(t)()
testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(s.DB)

sqlDB.Exec(t, `CREATE TABLE hasfams (id int primary key, a string, b string, c string, FAMILY id_a (id, a), FAMILY b_and_c (b, c))`)
sqlDB.Exec(t, `INSERT INTO hasfams values (0, 'a', 'b', 'c')`)

// Open up the changefeed.
cf := feed(t, f, `CREATE CHANGEFEED FOR TABLE hasfams FAMILY id_a, TABLE hasfams FAMILY b_and_c`)
defer closeFeed(t, cf)
assertPayloads(t, cf, []string{
`hasfams.b_and_c: [0]->{"after": {"b": "b", "c": "c"}}`,
`hasfams.id_a: [0]->{"after": {"a": "a", "id": 0}}`,
})

// Check that dropping a watched column will backfill the changefeed.
sqlDB.Exec(t, `ALTER TABLE hasfams DROP COLUMN a`)
assertPayloads(t, cf, []string{
`hasfams.id_a: [0]->{"after": {"id": 0}}`,
})

// Check that dropping a watched column will backfill the changefeed.
sqlDB.Exec(t, `ALTER TABLE hasfams DROP COLUMN b`)
assertPayloads(t, cf, []string{
`hasfams.b_and_c: [0]->{"after": {"c": "c"}}`,
})
}

cdcTest(t, testFn, feedTestOmitSinks("sinkless"))
}

func TestChangefeedExternalIODisabled(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down
21 changes: 20 additions & 1 deletion pkg/ccl/changefeedccl/changefeedbase/target.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,11 @@ type Target struct {
type StatementTimeName string

type targetsByTable struct {
wholeTable *Target
// wholeTable is a single target set when the target has no specified column
// families
wholeTable *Target
// byFamilyName is populated only if there are multiple column family targets
// for the table
byFamilyName map[string]Target
}

Expand Down Expand Up @@ -83,6 +87,21 @@ func (ts *Targets) EachTarget(f func(Target) error) error {
return nil
}

// GetSpecifiedColumnFamilies returns a set of watched families
// belonging to the table.
func (ts *Targets) GetSpecifiedColumnFamilies(tableID descpb.ID) map[string]*struct{} {
target, exists := ts.m[tableID]
if !exists {
return make(map[string]*struct{})
}

families := make(map[string]*struct{}, len(target.byFamilyName))
for family := range target.byFamilyName {
families[family] = nil
}
return families
}

// EachTableID iterates over unique TableIDs referenced in Targets.
func (ts *Targets) EachTableID(f func(descpb.ID) error) error {
for id := range ts.m {
Expand Down
15 changes: 11 additions & 4 deletions pkg/ccl/changefeedccl/kvfeed/kv_feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,11 +287,18 @@ func (f *kvFeed) run(ctx context.Context) (err error) {
if err != nil {
return err
}

// Detect whether the event corresponds to a primary index change. Also
// detect whether that primary index change corresponds to any change in
// the primary key or in the set of visible columns. If it corresponds to
// no such change, than it may be a column being dropped physically and
// should not trigger a failure in the `stop` policy.
// detect whether the change corresponds to any change in the set of visible
// primary key columns.
//
// If a primary key is being changed and there are no changes in the
// primary key's columns, this may be due to a column which was dropped
// logically before and is presently being physically dropped.
//
// If is no change in the primary key columns, then a primary key change
// should not trigger a failure in the `stop` policy because this change is
// effectively invisible to consumers.
primaryIndexChange, noColumnChanges := isPrimaryKeyChange(events)
if primaryIndexChange && (noColumnChanges ||
f.schemaChangePolicy != changefeedbase.OptSchemaChangePolicyStop) {
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/schemafeed/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ go_library(
"//pkg/sql/sessiondata",
"//pkg/sql/sqlutil",
"//pkg/storage",
"//pkg/util",
"//pkg/util/contextutil",
"//pkg/util/encoding",
"//pkg/util/hlc",
Expand Down
13 changes: 12 additions & 1 deletion pkg/ccl/changefeedccl/schemafeed/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,12 @@

package schemafeed

import "strings"
import (
"strings"

"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
)

const TestingAllEventFilter = "testing"

Expand All @@ -35,3 +40,9 @@ func PrintTableEventType(t tableEventType) string {
}
return strings.Join(strs, "|")
}

func CreateChangefeedTargets(tableID descpb.ID) changefeedbase.Targets {
targets := changefeedbase.Targets{}
targets.Add(changefeedbase.Target{TableID: tableID})
return targets
}
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/schemafeed/schema_feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -560,7 +560,7 @@ func (tf *schemaFeed) validateDescriptor(
Before: lastVersion,
After: desc,
}
shouldFilter, err := tf.filter.shouldFilter(ctx, e)
shouldFilter, err := tf.filter.shouldFilter(ctx, e, tf.targets)
log.VEventf(ctx, 1, "validate shouldFilter %v %v", formatEvent(e), shouldFilter)
if err != nil {
return err
Expand Down
62 changes: 60 additions & 2 deletions pkg/ccl/changefeedccl/schemafeed/table_event_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ import (

"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scpb"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/errors"
)

Expand Down Expand Up @@ -122,7 +124,9 @@ func classifyTableEvent(e TableEvent) tableEventTypeSet {
// permitted by the filter.
type tableEventFilter map[tableEventType]bool

func (filter tableEventFilter) shouldFilter(ctx context.Context, e TableEvent) (bool, error) {
func (filter tableEventFilter) shouldFilter(
ctx context.Context, e TableEvent, targets changefeedbase.Targets,
) (bool, error) {
et := classifyTableEvent(e)

// Truncation events are not ignored and return an error.
Expand All @@ -141,7 +145,19 @@ func (filter tableEventFilter) shouldFilter(ctx context.Context, e TableEvent) (
shouldFilter := true
for filterEvent, filterPolicy := range filter {
if et.Contains(filterEvent) && !filterPolicy {
shouldFilter = false
// Apply changefeed target-specific filters.
// In some cases, a drop column event should be filtered out.
// For example, we may be dropping a column which is not
// monitored by the changefeed.
if filterEvent == tableEventDropColumn {
sf, err := shouldFilterDropColumnEvent(e, targets)
if err != nil {
return false, err
}
shouldFilter = sf
} else {
shouldFilter = false
}
}
et = et.Clear(filterEvent)
}
Expand All @@ -151,6 +167,48 @@ func (filter tableEventFilter) shouldFilter(ctx context.Context, e TableEvent) (
return shouldFilter, nil
}

// shouldFilterDropColumnEvent decides if we should filter out a drop column event.
func shouldFilterDropColumnEvent(e TableEvent, targets changefeedbase.Targets) (bool, error) {
if watched, err := droppedColumnIsWatched(e, targets); err != nil {
return false, err
} else if watched {
return false, nil
}
return true, nil
}

// Returns true if the changefeed targets a column which has a drop mutation inside the table event.
func droppedColumnIsWatched(e TableEvent, targets changefeedbase.Targets) (bool, error) {
// If no column families are specified, then all columns are targeted.
specifiedColumnFamiliesForTable := targets.GetSpecifiedColumnFamilies(e.Before.GetID())
if len(specifiedColumnFamiliesForTable) == 0 {
return true, nil
}

var watchedColumnIDs util.FastIntSet
if err := e.Before.ForeachFamily(func(family *descpb.ColumnFamilyDescriptor) error {
if _, ok := specifiedColumnFamiliesForTable[family.Name]; ok {
for _, columnID := range family.ColumnIDs {
watchedColumnIDs.Add(int(columnID))
}
}
return nil
}); err != nil {
return false, err
}

for _, m := range e.After.AllMutations() {
if m.AsColumn() == nil || m.AsColumn().IsHidden() {
continue
}
if m.Dropped() && m.WriteAndDeleteOnly() && watchedColumnIDs.Contains(int(m.AsColumn().GetID())) {
return true, nil
}
}

return false, nil
}

func hasNewVisibleColumnDropBackfillMutation(e TableEvent) (res bool) {
// Make sure that the old descriptor *doesn't* have the same mutation to avoid adding
// the same scan boundary more than once.
Expand Down
Loading

0 comments on commit c6f667c

Please sign in to comment.