From 54f6c6e7f67fce184fdd32d65d220714603f3696 Mon Sep 17 00:00:00 2001 From: Steven Danna Date: Thu, 8 Apr 2021 16:15:34 +0100 Subject: [PATCH 1/2] changefeedccl: disallow changfeeds on regional by row tables To fully support changefeeds, we need to make a number of technical- and product-level decisions and changes. Until those changes are made, we want to disable changefeeds on regional by row tables. This change ads a new check in the table validator that will prevent new changefeeds from being started on a table where IsLocalityRegionalByRow() is true. Further, existing feeds will fails with a non-retriable error. Fixes #63239 Release note (enterprise change): Changefeeds will now fail on any regional by row table with an error such as: CHANGEFEED cannot target REGIONAL BY ROW tables: TABLE_NAME This is to prevent unexpected changefeed behavior until we can offer full support for regional by row tables. --- pkg/ccl/changefeedccl/BUILD.bazel | 2 + pkg/ccl/changefeedccl/changefeed_test.go | 73 ++++++++++++++++++- .../changefeedccl/changefeedbase/validate.go | 3 + pkg/ccl/changefeedccl/helpers_test.go | 24 +++++- pkg/ccl/changefeedccl/kvfeed/kv_feed.go | 2 +- 5 files changed, 99 insertions(+), 5 deletions(-) diff --git a/pkg/ccl/changefeedccl/BUILD.bazel b/pkg/ccl/changefeedccl/BUILD.bazel index 2dfb3b3615b8..53392f5539a0 100644 --- a/pkg/ccl/changefeedccl/BUILD.bazel +++ b/pkg/ccl/changefeedccl/BUILD.bazel @@ -123,6 +123,8 @@ go_test( "//pkg/ccl/changefeedccl/changefeedbase", "//pkg/ccl/changefeedccl/kvfeed", "//pkg/ccl/importccl", + "//pkg/ccl/multiregionccl", + "//pkg/ccl/partitionccl", "//pkg/ccl/storageccl", "//pkg/ccl/utilccl", "//pkg/clusterversion", diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index ad3844a80c2f..753deedb548b 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -27,6 +27,9 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdctest" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" + // Imported to allow locality-related table mutations + _ "github.com/cockroachdb/cockroach/pkg/ccl/multiregionccl" + _ "github.com/cockroachdb/cockroach/pkg/ccl/partitionccl" "github.com/cockroachdb/cockroach/pkg/ccl/utilccl" "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/jobs" @@ -67,6 +70,8 @@ import ( "github.com/stretchr/testify/require" ) +var testServerRegion = "us-east-1" + func TestChangefeedBasics(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -1175,6 +1180,55 @@ func TestChangefeedAuthorization(t *testing.T) { t.Run(`enterprise`, enterpriseTest(testFn)) } +func TestChangefeedFailOnRBRChange(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + rbrErrorRegex := regexp.MustCompile(`CHANGEFEED cannot target REGIONAL BY ROW tables: rbr`) + assertRBRError := func(ctx context.Context, f cdctest.TestFeed) { + ctx, cancel := context.WithTimeout(ctx, 30*time.Second) + defer cancel() + done := make(chan struct{}) + go func() { + if _, err := f.Next(); err != nil { + assert.Regexp(t, rbrErrorRegex, err) + done <- struct{}{} + } + }() + select { + case <-ctx.Done(): + t.Fatal("timed out waiting for changefeed to fail") + case <-done: + } + } + testFn := func(t *testing.T, db *gosql.DB, f cdctest.TestFeedFactory) { + sqlDB := sqlutils.MakeSQLRunner(db) + sqlDB.Exec(t, "SET CLUSTER SETTING kv.closed_timestamp.target_duration = '50ms'") + t.Run("regional by row", func(t *testing.T) { + sqlDB.Exec(t, `CREATE TABLE rbr (a INT PRIMARY KEY, b INT)`) + defer sqlDB.Exec(t, `DROP TABLE rbr`) + sqlDB.Exec(t, `INSERT INTO rbr VALUES (0, NULL)`) + rbr := feed(t, f, `CREATE CHANGEFEED FOR rbr `) + defer closeFeed(t, rbr) + sqlDB.Exec(t, `INSERT INTO rbr VALUES (1, 2)`) + assertPayloads(t, rbr, []string{ + `rbr: [0]->{"after": {"a": 0, "b": null}}`, + `rbr: [1]->{"after": {"a": 1, "b": 2}}`, + }) + sqlDB.Exec(t, `ALTER TABLE rbr SET LOCALITY REGIONAL BY ROW`) + assertRBRError(context.Background(), rbr) + }) + } + withTestServerRegion := func(args *base.TestServerArgs) { + args.Locality.Tiers = append(args.Locality.Tiers, roachpb.Tier{ + Key: "region", + Value: testServerRegion, + }) + } + t.Run(`sinkless`, sinklessTestWithServerArgs(withTestServerRegion, testFn)) + t.Run("enterprise", enterpriseTestWithServerArgs(withTestServerRegion, testFn)) +} + func TestChangefeedStopOnSchemaChange(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -1963,7 +2017,14 @@ func TestChangefeedErrors(t *testing.T) { defer log.Scope(t).Close(t) ctx := context.Background() - s, db, _ := serverutils.StartServer(t, base.TestServerArgs{}) + s, db, _ := serverutils.StartServer(t, base.TestServerArgs{ + Locality: roachpb.Locality{ + Tiers: []roachpb.Tier{{ + Key: "region", + Value: testServerRegion, + }}, + }, + }) defer s.Stopper().Stop(ctx) sqlDB := sqlutils.MakeSQLRunner(db) sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`) @@ -2043,6 +2104,16 @@ func TestChangefeedErrors(t *testing.T) { t, `CHANGEFEED cannot target views: vw`, `EXPERIMENTAL CHANGEFEED FOR vw`, ) + + // Regional by row tables are not currently supported + sqlDB.Exec(t, fmt.Sprintf(`ALTER DATABASE defaultdb PRIMARY REGION "%s"`, testServerRegion)) + sqlDB.Exec(t, `CREATE TABLE test_cdc_rbr_fails (a INT PRIMARY KEY)`) + sqlDB.Exec(t, `ALTER TABLE test_cdc_rbr_fails SET LOCALITY REGIONAL BY ROW`) + sqlDB.ExpectErr( + t, `CHANGEFEED cannot target REGIONAL BY ROW tables: test_cdc_rbr_fails`, + `CREATE CHANGEFEED FOR test_cdc_rbr_fails`, + ) + // Backup has the same bad error message #28170. sqlDB.ExpectErr( t, `"information_schema.tables" does not exist`, diff --git a/pkg/ccl/changefeedccl/changefeedbase/validate.go b/pkg/ccl/changefeedccl/changefeedbase/validate.go index 8367b7850572..ce38aa8055f0 100644 --- a/pkg/ccl/changefeedccl/changefeedbase/validate.go +++ b/pkg/ccl/changefeedccl/changefeedbase/validate.go @@ -40,6 +40,9 @@ func ValidateTable(targets jobspb.ChangefeedTargets, tableDesc catalog.TableDesc if tableDesc.IsSequence() { return errors.Errorf(`CHANGEFEED cannot target sequences: %s`, tableDesc.GetName()) } + if tableDesc.IsLocalityRegionalByRow() { + return errors.Errorf(`CHANGEFEED cannot target REGIONAL BY ROW tables: %s`, tableDesc.GetName()) + } if len(tableDesc.GetFamilies()) != 1 { return errors.Errorf( `CHANGEFEEDs are currently supported on tables with exactly 1 column family: %s has %d`, diff --git a/pkg/ccl/changefeedccl/helpers_test.go b/pkg/ccl/changefeedccl/helpers_test.go index f30c0d96c9fc..4af3ba540904 100644 --- a/pkg/ccl/changefeedccl/helpers_test.go +++ b/pkg/ccl/changefeedccl/helpers_test.go @@ -24,6 +24,9 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdctest" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" + // Imported to allow locality-related table mutations + _ "github.com/cockroachdb/cockroach/pkg/ccl/multiregionccl" + _ "github.com/cockroachdb/cockroach/pkg/ccl/partitionccl" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/security" "github.com/cockroachdb/cockroach/pkg/sql/execinfra" @@ -246,7 +249,7 @@ func expectResolvedTimestampAvro( return parseTimeToHLC(t, resolved.(map[string]interface{})[`string`].(string)) } -func sinlesttTestWithServerArgs( +func sinklessTestWithServerArgs( argsFn func(args *base.TestServerArgs), testFn func(*testing.T, *gosql.DB, cdctest.TestFeedFactory), ) func(*testing.T) { @@ -280,7 +283,9 @@ func sinlesttTestWithServerArgs( // that we'll still use the row-by-row engine, see #55605). sqlDB.Exec(t, `SET CLUSTER SETTING sql.defaults.vectorize=on`) sqlDB.Exec(t, `CREATE DATABASE d`) - + if region := serverArgsRegion(args); region != "" { + sqlDB.Exec(t, fmt.Sprintf(`ALTER DATABASE d PRIMARY REGION "%s"`, region)) + } sink, cleanup := sqlutils.PGUrl(t, s.ServingSQLAddr(), t.Name(), url.User(security.RootUser)) defer cleanup() f := cdctest.MakeSinklessFeedFactory(s, sink) @@ -289,7 +294,7 @@ func sinlesttTestWithServerArgs( } func sinklessTest(testFn func(*testing.T, *gosql.DB, cdctest.TestFeedFactory)) func(*testing.T) { - return sinlesttTestWithServerArgs(nil, testFn) + return sinklessTestWithServerArgs(nil, testFn) } func enterpriseTest(testFn func(*testing.T, *gosql.DB, cdctest.TestFeedFactory)) func(*testing.T) { @@ -330,6 +335,10 @@ func enterpriseTestWithServerArgs( sqlDB.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.target_duration = '1s'`) sqlDB.Exec(t, `SET CLUSTER SETTING changefeed.experimental_poll_interval = '10ms'`) sqlDB.Exec(t, `CREATE DATABASE d`) + + if region := serverArgsRegion(args); region != "" { + sqlDB.Exec(t, fmt.Sprintf(`ALTER DATABASE d PRIMARY REGION "%s"`, region)) + } sink, cleanup := sqlutils.PGUrl(t, s.ServingSQLAddr(), t.Name(), url.User(security.RootUser)) defer cleanup() f := cdctest.MakeTableFeedFactory(s, db, flushCh, sink) @@ -338,6 +347,15 @@ func enterpriseTestWithServerArgs( } } +func serverArgsRegion(args base.TestServerArgs) string { + for _, tier := range args.Locality.Tiers { + if tier.Key == "region" { + return tier.Value + } + } + return "" +} + func cloudStorageTest( testFn func(*testing.T, *gosql.DB, cdctest.TestFeedFactory), ) func(*testing.T) { diff --git a/pkg/ccl/changefeedccl/kvfeed/kv_feed.go b/pkg/ccl/changefeedccl/kvfeed/kv_feed.go index 32791d6232f6..262a61a5a260 100644 --- a/pkg/ccl/changefeedccl/kvfeed/kv_feed.go +++ b/pkg/ccl/changefeedccl/kvfeed/kv_feed.go @@ -124,7 +124,7 @@ type schemaChangeDetectedError struct { } func (e schemaChangeDetectedError) Error() string { - return fmt.Sprintf("schema change deteceted at %v", e.ts) + return fmt.Sprintf("schema change detected at %v", e.ts) } type schemaFeed interface { From a1bdf28464adbdf4f4b8df1d6ef283a3c0d9fd4a Mon Sep 17 00:00:00 2001 From: Steven Danna Date: Thu, 8 Apr 2021 16:22:59 +0100 Subject: [PATCH 2/2] changefeed: detect regional by row changes in kv_feed This isn't strictly necessary as the schemafeed currently fails with an unretriable error if it fails to validate the table descriptor and regional by row tables will fail to validate. However, it might be useful in general to be able to detect unsupported schema migrations at this location since other schema changes that we might want to disallow will not necessarily force a table descriptor validation failure. Release note: None --- pkg/ccl/changefeedccl/kvfeed/kv_feed.go | 30 +++++++++- .../schematestutils/schema_test_utils.go | 11 ++++ .../schemafeed/table_event_filter.go | 38 +++++++++---- .../schemafeed/table_event_filter_test.go | 55 +++++++++++++++++++ 4 files changed, 123 insertions(+), 11 deletions(-) diff --git a/pkg/ccl/changefeedccl/kvfeed/kv_feed.go b/pkg/ccl/changefeedccl/kvfeed/kv_feed.go index 262a61a5a260..85ae07ddddd2 100644 --- a/pkg/ccl/changefeedccl/kvfeed/kv_feed.go +++ b/pkg/ccl/changefeedccl/kvfeed/kv_feed.go @@ -127,6 +127,15 @@ func (e schemaChangeDetectedError) Error() string { return fmt.Sprintf("schema change detected at %v", e.ts) } +type unsupportedSchemaChangeDetected struct { + desc string + ts hlc.Timestamp +} + +func (e unsupportedSchemaChangeDetected) Error() string { + return fmt.Sprintf("unsupported schema change %s detected at %s", e.desc, e.ts.AsOfSystemTime()) +} + type schemaFeed interface { Peek(ctx context.Context, atOrBefore hlc.Timestamp) (events []schemafeed.TableEvent, err error) Pop(ctx context.Context, atOrBefore hlc.Timestamp) (events []schemafeed.TableEvent, err error) @@ -214,7 +223,17 @@ func (f *kvFeed) run(ctx context.Context) (err error) { boundaryType := jobspb.ResolvedSpan_BACKFILL if f.schemaChangePolicy == changefeedbase.OptSchemaChangePolicyStop { boundaryType = jobspb.ResolvedSpan_EXIT - } else if events, err := f.tableFeed.Peek(ctx, highWater.Next()); err == nil && isPrimaryKeyChange(events) { + } else if events, err := f.tableFeed.Peek(ctx, highWater.Next()); err == nil && isRegionalByRowChange(events) { + // NOTE(ssd): The user is unlikely to see this + // error. The schemafeed will fail with an + // non-retriable error, meaning we likely + // return right after runUntilTableEvent + // above. + return unsupportedSchemaChangeDetected{ + desc: "SET REGIONAL BY ROW", + ts: highWater.Next(), + } + } else if err == nil && isPrimaryKeyChange(events) { boundaryType = jobspb.ResolvedSpan_RESTART } else if err != nil { return err @@ -245,6 +264,15 @@ func isPrimaryKeyChange(events []schemafeed.TableEvent) bool { return false } +func isRegionalByRowChange(events []schemafeed.TableEvent) bool { + for _, ev := range events { + if schemafeed.IsRegionalByRowChange(ev) { + return true + } + } + return false +} + func (f *kvFeed) scanIfShould( ctx context.Context, initialScan bool, highWater hlc.Timestamp, ) error { diff --git a/pkg/ccl/changefeedccl/schemafeed/schematestutils/schema_test_utils.go b/pkg/ccl/changefeedccl/schemafeed/schematestutils/schema_test_utils.go index e8f8e368d607..03c2e23b3cc1 100644 --- a/pkg/ccl/changefeedccl/schemafeed/schematestutils/schema_test_utils.go +++ b/pkg/ccl/changefeedccl/schemafeed/schematestutils/schema_test_utils.go @@ -49,6 +49,17 @@ func MakeColumnDesc(id descpb.ColumnID) *descpb.ColumnDescriptor { } } +// SetLocalityRegionalByRow sets the LocalityConfig of the table +// descriptor such that desc.IsLocalityRegionalByRow will return true. +func SetLocalityRegionalByRow(desc catalog.TableDescriptor) catalog.TableDescriptor { + desc.TableDesc().LocalityConfig = &descpb.TableDescriptor_LocalityConfig{ + Locality: &descpb.TableDescriptor_LocalityConfig_RegionalByRow_{ + RegionalByRow: &descpb.TableDescriptor_LocalityConfig_RegionalByRow{}, + }, + } + return tabledesc.NewBuilder(desc.TableDesc()).BuildImmutableTable() +} + // AddColumnDropBackfillMutation adds a mutation to desc to drop a column. // Yes, this does modify an immutable. func AddColumnDropBackfillMutation(desc catalog.TableDescriptor) catalog.TableDescriptor { diff --git a/pkg/ccl/changefeedccl/schemafeed/table_event_filter.go b/pkg/ccl/changefeedccl/schemafeed/table_event_filter.go index 8473f31daebc..be4bf9a437cf 100644 --- a/pkg/ccl/changefeedccl/schemafeed/table_event_filter.go +++ b/pkg/ccl/changefeedccl/schemafeed/table_event_filter.go @@ -25,23 +25,26 @@ const ( tableEventTypeDropColumn tableEventTruncate tableEventPrimaryKeyChange + tableEventLocalityRegionalByRowChange ) var ( defaultTableEventFilter = tableEventFilter{ - tableEventTypeDropColumn: false, - tableEventTypeAddColumnWithBackfill: false, - tableEventTypeAddColumnNoBackfill: true, - tableEventTypeUnknown: true, - tableEventPrimaryKeyChange: false, + tableEventTypeDropColumn: false, + tableEventTypeAddColumnWithBackfill: false, + tableEventTypeAddColumnNoBackfill: true, + tableEventTypeUnknown: true, + tableEventPrimaryKeyChange: false, + tableEventLocalityRegionalByRowChange: false, } columnChangeTableEventFilter = tableEventFilter{ - tableEventTypeDropColumn: false, - tableEventTypeAddColumnWithBackfill: false, - tableEventTypeAddColumnNoBackfill: false, - tableEventTypeUnknown: true, - tableEventPrimaryKeyChange: false, + tableEventTypeDropColumn: false, + tableEventTypeAddColumnWithBackfill: false, + tableEventTypeAddColumnNoBackfill: false, + tableEventTypeUnknown: true, + tableEventPrimaryKeyChange: false, + tableEventLocalityRegionalByRowChange: false, } schemaChangeEventFilters = map[changefeedbase.SchemaChangeEventClass]tableEventFilter{ @@ -52,6 +55,11 @@ var ( func classifyTableEvent(e TableEvent) tableEventType { switch { + // Take care before changing the ordering here. Until we can + // classify events with multiple types, we need to ensure that + // we detect regionalByRow changes with priority. + case regionalByRowChanged(e): + return tableEventLocalityRegionalByRowChange case newColumnBackfillComplete(e): return tableEventTypeAddColumnWithBackfill case newColumnNoBackfill(e): @@ -135,8 +143,18 @@ func primaryKeyChanged(e TableEvent) bool { pkChangeMutationExists(e.Before) } +func regionalByRowChanged(e TableEvent) bool { + return e.Before.IsLocalityRegionalByRow() != e.After.IsLocalityRegionalByRow() +} + // IsPrimaryIndexChange returns true if the event corresponds to a change // in the primary index. func IsPrimaryIndexChange(e TableEvent) bool { return classifyTableEvent(e) == tableEventPrimaryKeyChange } + +// IsRegionalByRowChange returns true if the event corresponds to a +// change in the table's locality to or from RegionalByRow. +func IsRegionalByRowChange(e TableEvent) bool { + return classifyTableEvent(e) == tableEventLocalityRegionalByRowChange +} diff --git a/pkg/ccl/changefeedccl/schemafeed/table_event_filter_test.go b/pkg/ccl/changefeedccl/schemafeed/table_event_filter_test.go index 76a0f759e7b3..f0dac93fa863 100644 --- a/pkg/ccl/changefeedccl/schemafeed/table_event_filter_test.go +++ b/pkg/ccl/changefeedccl/schemafeed/table_event_filter_test.go @@ -21,6 +21,42 @@ import ( "github.com/stretchr/testify/require" ) +func TestTableEventIsRegionalByRowChange(t *testing.T) { + ts := func(seconds int) hlc.Timestamp { + return hlc.Timestamp{WallTime: (time.Duration(seconds) * time.Second).Nanoseconds()} + } + mkTableDesc := schematestutils.MakeTableDesc + addColBackfill := schematestutils.AddNewColumnBackfillMutation + setRBR := schematestutils.SetLocalityRegionalByRow + for _, c := range []struct { + name string + e TableEvent + exp bool + }{ + { + name: "regional by row change", + e: TableEvent{ + Before: mkTableDesc(42, 1, ts(2), 2), + After: setRBR(mkTableDesc(42, 2, ts(3), 2)), + }, + exp: true, + }, + { + name: "add non-NULL column", + e: TableEvent{ + Before: addColBackfill(mkTableDesc(42, 3, ts(2), 1)), + After: mkTableDesc(42, 4, ts(4), 2), + }, + exp: false, + }, + } { + t.Run(c.name, func(t *testing.T) { + require.Equal(t, c.exp, IsRegionalByRowChange(c.e)) + }) + } + +} + func TestTableEventFilter(t *testing.T) { ts := func(seconds int) hlc.Timestamp { return hlc.Timestamp{WallTime: (time.Duration(seconds) * time.Second).Nanoseconds()} @@ -28,6 +64,7 @@ func TestTableEventFilter(t *testing.T) { mkTableDesc := schematestutils.MakeTableDesc addColBackfill := schematestutils.AddNewColumnBackfillMutation dropColBackfill := schematestutils.AddColumnDropBackfillMutation + setRBR := schematestutils.SetLocalityRegionalByRow for _, c := range []struct { name string p tableEventFilter @@ -94,6 +131,15 @@ func TestTableEventFilter(t *testing.T) { }, exp: true, }, + { + name: "don't filter regional by row change", + p: defaultTableEventFilter, + e: TableEvent{ + Before: mkTableDesc(42, 1, ts(2), 2), + After: setRBR(mkTableDesc(42, 2, ts(3), 2)), + }, + exp: false, + }, { name: "columnChange - don't filter end of add NULL column", p: columnChangeTableEventFilter, @@ -103,6 +149,15 @@ func TestTableEventFilter(t *testing.T) { }, exp: false, }, + { + name: "columnChange - don't filter regional by row change", + p: columnChangeTableEventFilter, + e: TableEvent{ + Before: mkTableDesc(42, 1, ts(2), 2), + After: setRBR(mkTableDesc(42, 2, ts(3), 2)), + }, + exp: false, + }, } { t.Run(c.name, func(t *testing.T) { shouldFilter, err := c.p.shouldFilter(context.Background(), c.e)