diff --git a/pkg/ccl/changefeedccl/BUILD.bazel b/pkg/ccl/changefeedccl/BUILD.bazel index 2dfb3b3615b8..53392f5539a0 100644 --- a/pkg/ccl/changefeedccl/BUILD.bazel +++ b/pkg/ccl/changefeedccl/BUILD.bazel @@ -123,6 +123,8 @@ go_test( "//pkg/ccl/changefeedccl/changefeedbase", "//pkg/ccl/changefeedccl/kvfeed", "//pkg/ccl/importccl", + "//pkg/ccl/multiregionccl", + "//pkg/ccl/partitionccl", "//pkg/ccl/storageccl", "//pkg/ccl/utilccl", "//pkg/clusterversion", diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index ad3844a80c2f..753deedb548b 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -27,6 +27,9 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdctest" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" + // Imported to allow locality-related table mutations + _ "github.com/cockroachdb/cockroach/pkg/ccl/multiregionccl" + _ "github.com/cockroachdb/cockroach/pkg/ccl/partitionccl" "github.com/cockroachdb/cockroach/pkg/ccl/utilccl" "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/jobs" @@ -67,6 +70,8 @@ import ( "github.com/stretchr/testify/require" ) +var testServerRegion = "us-east-1" + func TestChangefeedBasics(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -1175,6 +1180,55 @@ func TestChangefeedAuthorization(t *testing.T) { t.Run(`enterprise`, enterpriseTest(testFn)) } +func TestChangefeedFailOnRBRChange(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + rbrErrorRegex := regexp.MustCompile(`CHANGEFEED cannot target REGIONAL BY ROW tables: rbr`) + assertRBRError := func(ctx context.Context, f cdctest.TestFeed) { + ctx, cancel := context.WithTimeout(ctx, 30*time.Second) + defer cancel() + done := make(chan struct{}) + go func() { + if _, err := f.Next(); err != nil { + assert.Regexp(t, rbrErrorRegex, err) + done <- struct{}{} + } + }() + select { + case <-ctx.Done(): + t.Fatal("timed out waiting for changefeed to fail") + case <-done: + } + } + testFn := func(t *testing.T, db *gosql.DB, f cdctest.TestFeedFactory) { + sqlDB := sqlutils.MakeSQLRunner(db) + sqlDB.Exec(t, "SET CLUSTER SETTING kv.closed_timestamp.target_duration = '50ms'") + t.Run("regional by row", func(t *testing.T) { + sqlDB.Exec(t, `CREATE TABLE rbr (a INT PRIMARY KEY, b INT)`) + defer sqlDB.Exec(t, `DROP TABLE rbr`) + sqlDB.Exec(t, `INSERT INTO rbr VALUES (0, NULL)`) + rbr := feed(t, f, `CREATE CHANGEFEED FOR rbr `) + defer closeFeed(t, rbr) + sqlDB.Exec(t, `INSERT INTO rbr VALUES (1, 2)`) + assertPayloads(t, rbr, []string{ + `rbr: [0]->{"after": {"a": 0, "b": null}}`, + `rbr: [1]->{"after": {"a": 1, "b": 2}}`, + }) + sqlDB.Exec(t, `ALTER TABLE rbr SET LOCALITY REGIONAL BY ROW`) + assertRBRError(context.Background(), rbr) + }) + } + withTestServerRegion := func(args *base.TestServerArgs) { + args.Locality.Tiers = append(args.Locality.Tiers, roachpb.Tier{ + Key: "region", + Value: testServerRegion, + }) + } + t.Run(`sinkless`, sinklessTestWithServerArgs(withTestServerRegion, testFn)) + t.Run("enterprise", enterpriseTestWithServerArgs(withTestServerRegion, testFn)) +} + func TestChangefeedStopOnSchemaChange(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -1963,7 +2017,14 @@ func TestChangefeedErrors(t *testing.T) { defer log.Scope(t).Close(t) ctx := context.Background() - s, db, _ := serverutils.StartServer(t, base.TestServerArgs{}) + s, db, _ := serverutils.StartServer(t, base.TestServerArgs{ + Locality: roachpb.Locality{ + Tiers: []roachpb.Tier{{ + Key: "region", + Value: testServerRegion, + }}, + }, + }) defer s.Stopper().Stop(ctx) sqlDB := sqlutils.MakeSQLRunner(db) sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`) @@ -2043,6 +2104,16 @@ func TestChangefeedErrors(t *testing.T) { t, `CHANGEFEED cannot target views: vw`, `EXPERIMENTAL CHANGEFEED FOR vw`, ) + + // Regional by row tables are not currently supported + sqlDB.Exec(t, fmt.Sprintf(`ALTER DATABASE defaultdb PRIMARY REGION "%s"`, testServerRegion)) + sqlDB.Exec(t, `CREATE TABLE test_cdc_rbr_fails (a INT PRIMARY KEY)`) + sqlDB.Exec(t, `ALTER TABLE test_cdc_rbr_fails SET LOCALITY REGIONAL BY ROW`) + sqlDB.ExpectErr( + t, `CHANGEFEED cannot target REGIONAL BY ROW tables: test_cdc_rbr_fails`, + `CREATE CHANGEFEED FOR test_cdc_rbr_fails`, + ) + // Backup has the same bad error message #28170. sqlDB.ExpectErr( t, `"information_schema.tables" does not exist`, diff --git a/pkg/ccl/changefeedccl/changefeedbase/validate.go b/pkg/ccl/changefeedccl/changefeedbase/validate.go index 8367b7850572..ce38aa8055f0 100644 --- a/pkg/ccl/changefeedccl/changefeedbase/validate.go +++ b/pkg/ccl/changefeedccl/changefeedbase/validate.go @@ -40,6 +40,9 @@ func ValidateTable(targets jobspb.ChangefeedTargets, tableDesc catalog.TableDesc if tableDesc.IsSequence() { return errors.Errorf(`CHANGEFEED cannot target sequences: %s`, tableDesc.GetName()) } + if tableDesc.IsLocalityRegionalByRow() { + return errors.Errorf(`CHANGEFEED cannot target REGIONAL BY ROW tables: %s`, tableDesc.GetName()) + } if len(tableDesc.GetFamilies()) != 1 { return errors.Errorf( `CHANGEFEEDs are currently supported on tables with exactly 1 column family: %s has %d`, diff --git a/pkg/ccl/changefeedccl/helpers_test.go b/pkg/ccl/changefeedccl/helpers_test.go index f30c0d96c9fc..4af3ba540904 100644 --- a/pkg/ccl/changefeedccl/helpers_test.go +++ b/pkg/ccl/changefeedccl/helpers_test.go @@ -24,6 +24,9 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdctest" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" + // Imported to allow locality-related table mutations + _ "github.com/cockroachdb/cockroach/pkg/ccl/multiregionccl" + _ "github.com/cockroachdb/cockroach/pkg/ccl/partitionccl" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/security" "github.com/cockroachdb/cockroach/pkg/sql/execinfra" @@ -246,7 +249,7 @@ func expectResolvedTimestampAvro( return parseTimeToHLC(t, resolved.(map[string]interface{})[`string`].(string)) } -func sinlesttTestWithServerArgs( +func sinklessTestWithServerArgs( argsFn func(args *base.TestServerArgs), testFn func(*testing.T, *gosql.DB, cdctest.TestFeedFactory), ) func(*testing.T) { @@ -280,7 +283,9 @@ func sinlesttTestWithServerArgs( // that we'll still use the row-by-row engine, see #55605). sqlDB.Exec(t, `SET CLUSTER SETTING sql.defaults.vectorize=on`) sqlDB.Exec(t, `CREATE DATABASE d`) - + if region := serverArgsRegion(args); region != "" { + sqlDB.Exec(t, fmt.Sprintf(`ALTER DATABASE d PRIMARY REGION "%s"`, region)) + } sink, cleanup := sqlutils.PGUrl(t, s.ServingSQLAddr(), t.Name(), url.User(security.RootUser)) defer cleanup() f := cdctest.MakeSinklessFeedFactory(s, sink) @@ -289,7 +294,7 @@ func sinlesttTestWithServerArgs( } func sinklessTest(testFn func(*testing.T, *gosql.DB, cdctest.TestFeedFactory)) func(*testing.T) { - return sinlesttTestWithServerArgs(nil, testFn) + return sinklessTestWithServerArgs(nil, testFn) } func enterpriseTest(testFn func(*testing.T, *gosql.DB, cdctest.TestFeedFactory)) func(*testing.T) { @@ -330,6 +335,10 @@ func enterpriseTestWithServerArgs( sqlDB.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.target_duration = '1s'`) sqlDB.Exec(t, `SET CLUSTER SETTING changefeed.experimental_poll_interval = '10ms'`) sqlDB.Exec(t, `CREATE DATABASE d`) + + if region := serverArgsRegion(args); region != "" { + sqlDB.Exec(t, fmt.Sprintf(`ALTER DATABASE d PRIMARY REGION "%s"`, region)) + } sink, cleanup := sqlutils.PGUrl(t, s.ServingSQLAddr(), t.Name(), url.User(security.RootUser)) defer cleanup() f := cdctest.MakeTableFeedFactory(s, db, flushCh, sink) @@ -338,6 +347,15 @@ func enterpriseTestWithServerArgs( } } +func serverArgsRegion(args base.TestServerArgs) string { + for _, tier := range args.Locality.Tiers { + if tier.Key == "region" { + return tier.Value + } + } + return "" +} + func cloudStorageTest( testFn func(*testing.T, *gosql.DB, cdctest.TestFeedFactory), ) func(*testing.T) { diff --git a/pkg/ccl/changefeedccl/kvfeed/kv_feed.go b/pkg/ccl/changefeedccl/kvfeed/kv_feed.go index 32791d6232f6..85ae07ddddd2 100644 --- a/pkg/ccl/changefeedccl/kvfeed/kv_feed.go +++ b/pkg/ccl/changefeedccl/kvfeed/kv_feed.go @@ -124,7 +124,16 @@ type schemaChangeDetectedError struct { } func (e schemaChangeDetectedError) Error() string { - return fmt.Sprintf("schema change deteceted at %v", e.ts) + return fmt.Sprintf("schema change detected at %v", e.ts) +} + +type unsupportedSchemaChangeDetected struct { + desc string + ts hlc.Timestamp +} + +func (e unsupportedSchemaChangeDetected) Error() string { + return fmt.Sprintf("unsupported schema change %s detected at %s", e.desc, e.ts.AsOfSystemTime()) } type schemaFeed interface { @@ -214,7 +223,17 @@ func (f *kvFeed) run(ctx context.Context) (err error) { boundaryType := jobspb.ResolvedSpan_BACKFILL if f.schemaChangePolicy == changefeedbase.OptSchemaChangePolicyStop { boundaryType = jobspb.ResolvedSpan_EXIT - } else if events, err := f.tableFeed.Peek(ctx, highWater.Next()); err == nil && isPrimaryKeyChange(events) { + } else if events, err := f.tableFeed.Peek(ctx, highWater.Next()); err == nil && isRegionalByRowChange(events) { + // NOTE(ssd): The user is unlikely to see this + // error. The schemafeed will fail with an + // non-retriable error, meaning we likely + // return right after runUntilTableEvent + // above. + return unsupportedSchemaChangeDetected{ + desc: "SET REGIONAL BY ROW", + ts: highWater.Next(), + } + } else if err == nil && isPrimaryKeyChange(events) { boundaryType = jobspb.ResolvedSpan_RESTART } else if err != nil { return err @@ -245,6 +264,15 @@ func isPrimaryKeyChange(events []schemafeed.TableEvent) bool { return false } +func isRegionalByRowChange(events []schemafeed.TableEvent) bool { + for _, ev := range events { + if schemafeed.IsRegionalByRowChange(ev) { + return true + } + } + return false +} + func (f *kvFeed) scanIfShould( ctx context.Context, initialScan bool, highWater hlc.Timestamp, ) error { diff --git a/pkg/ccl/changefeedccl/schemafeed/schematestutils/schema_test_utils.go b/pkg/ccl/changefeedccl/schemafeed/schematestutils/schema_test_utils.go index e8f8e368d607..03c2e23b3cc1 100644 --- a/pkg/ccl/changefeedccl/schemafeed/schematestutils/schema_test_utils.go +++ b/pkg/ccl/changefeedccl/schemafeed/schematestutils/schema_test_utils.go @@ -49,6 +49,17 @@ func MakeColumnDesc(id descpb.ColumnID) *descpb.ColumnDescriptor { } } +// SetLocalityRegionalByRow sets the LocalityConfig of the table +// descriptor such that desc.IsLocalityRegionalByRow will return true. +func SetLocalityRegionalByRow(desc catalog.TableDescriptor) catalog.TableDescriptor { + desc.TableDesc().LocalityConfig = &descpb.TableDescriptor_LocalityConfig{ + Locality: &descpb.TableDescriptor_LocalityConfig_RegionalByRow_{ + RegionalByRow: &descpb.TableDescriptor_LocalityConfig_RegionalByRow{}, + }, + } + return tabledesc.NewBuilder(desc.TableDesc()).BuildImmutableTable() +} + // AddColumnDropBackfillMutation adds a mutation to desc to drop a column. // Yes, this does modify an immutable. func AddColumnDropBackfillMutation(desc catalog.TableDescriptor) catalog.TableDescriptor { diff --git a/pkg/ccl/changefeedccl/schemafeed/table_event_filter.go b/pkg/ccl/changefeedccl/schemafeed/table_event_filter.go index 8473f31daebc..be4bf9a437cf 100644 --- a/pkg/ccl/changefeedccl/schemafeed/table_event_filter.go +++ b/pkg/ccl/changefeedccl/schemafeed/table_event_filter.go @@ -25,23 +25,26 @@ const ( tableEventTypeDropColumn tableEventTruncate tableEventPrimaryKeyChange + tableEventLocalityRegionalByRowChange ) var ( defaultTableEventFilter = tableEventFilter{ - tableEventTypeDropColumn: false, - tableEventTypeAddColumnWithBackfill: false, - tableEventTypeAddColumnNoBackfill: true, - tableEventTypeUnknown: true, - tableEventPrimaryKeyChange: false, + tableEventTypeDropColumn: false, + tableEventTypeAddColumnWithBackfill: false, + tableEventTypeAddColumnNoBackfill: true, + tableEventTypeUnknown: true, + tableEventPrimaryKeyChange: false, + tableEventLocalityRegionalByRowChange: false, } columnChangeTableEventFilter = tableEventFilter{ - tableEventTypeDropColumn: false, - tableEventTypeAddColumnWithBackfill: false, - tableEventTypeAddColumnNoBackfill: false, - tableEventTypeUnknown: true, - tableEventPrimaryKeyChange: false, + tableEventTypeDropColumn: false, + tableEventTypeAddColumnWithBackfill: false, + tableEventTypeAddColumnNoBackfill: false, + tableEventTypeUnknown: true, + tableEventPrimaryKeyChange: false, + tableEventLocalityRegionalByRowChange: false, } schemaChangeEventFilters = map[changefeedbase.SchemaChangeEventClass]tableEventFilter{ @@ -52,6 +55,11 @@ var ( func classifyTableEvent(e TableEvent) tableEventType { switch { + // Take care before changing the ordering here. Until we can + // classify events with multiple types, we need to ensure that + // we detect regionalByRow changes with priority. + case regionalByRowChanged(e): + return tableEventLocalityRegionalByRowChange case newColumnBackfillComplete(e): return tableEventTypeAddColumnWithBackfill case newColumnNoBackfill(e): @@ -135,8 +143,18 @@ func primaryKeyChanged(e TableEvent) bool { pkChangeMutationExists(e.Before) } +func regionalByRowChanged(e TableEvent) bool { + return e.Before.IsLocalityRegionalByRow() != e.After.IsLocalityRegionalByRow() +} + // IsPrimaryIndexChange returns true if the event corresponds to a change // in the primary index. func IsPrimaryIndexChange(e TableEvent) bool { return classifyTableEvent(e) == tableEventPrimaryKeyChange } + +// IsRegionalByRowChange returns true if the event corresponds to a +// change in the table's locality to or from RegionalByRow. +func IsRegionalByRowChange(e TableEvent) bool { + return classifyTableEvent(e) == tableEventLocalityRegionalByRowChange +} diff --git a/pkg/ccl/changefeedccl/schemafeed/table_event_filter_test.go b/pkg/ccl/changefeedccl/schemafeed/table_event_filter_test.go index 76a0f759e7b3..f0dac93fa863 100644 --- a/pkg/ccl/changefeedccl/schemafeed/table_event_filter_test.go +++ b/pkg/ccl/changefeedccl/schemafeed/table_event_filter_test.go @@ -21,6 +21,42 @@ import ( "github.com/stretchr/testify/require" ) +func TestTableEventIsRegionalByRowChange(t *testing.T) { + ts := func(seconds int) hlc.Timestamp { + return hlc.Timestamp{WallTime: (time.Duration(seconds) * time.Second).Nanoseconds()} + } + mkTableDesc := schematestutils.MakeTableDesc + addColBackfill := schematestutils.AddNewColumnBackfillMutation + setRBR := schematestutils.SetLocalityRegionalByRow + for _, c := range []struct { + name string + e TableEvent + exp bool + }{ + { + name: "regional by row change", + e: TableEvent{ + Before: mkTableDesc(42, 1, ts(2), 2), + After: setRBR(mkTableDesc(42, 2, ts(3), 2)), + }, + exp: true, + }, + { + name: "add non-NULL column", + e: TableEvent{ + Before: addColBackfill(mkTableDesc(42, 3, ts(2), 1)), + After: mkTableDesc(42, 4, ts(4), 2), + }, + exp: false, + }, + } { + t.Run(c.name, func(t *testing.T) { + require.Equal(t, c.exp, IsRegionalByRowChange(c.e)) + }) + } + +} + func TestTableEventFilter(t *testing.T) { ts := func(seconds int) hlc.Timestamp { return hlc.Timestamp{WallTime: (time.Duration(seconds) * time.Second).Nanoseconds()} @@ -28,6 +64,7 @@ func TestTableEventFilter(t *testing.T) { mkTableDesc := schematestutils.MakeTableDesc addColBackfill := schematestutils.AddNewColumnBackfillMutation dropColBackfill := schematestutils.AddColumnDropBackfillMutation + setRBR := schematestutils.SetLocalityRegionalByRow for _, c := range []struct { name string p tableEventFilter @@ -94,6 +131,15 @@ func TestTableEventFilter(t *testing.T) { }, exp: true, }, + { + name: "don't filter regional by row change", + p: defaultTableEventFilter, + e: TableEvent{ + Before: mkTableDesc(42, 1, ts(2), 2), + After: setRBR(mkTableDesc(42, 2, ts(3), 2)), + }, + exp: false, + }, { name: "columnChange - don't filter end of add NULL column", p: columnChangeTableEventFilter, @@ -103,6 +149,15 @@ func TestTableEventFilter(t *testing.T) { }, exp: false, }, + { + name: "columnChange - don't filter regional by row change", + p: columnChangeTableEventFilter, + e: TableEvent{ + Before: mkTableDesc(42, 1, ts(2), 2), + After: setRBR(mkTableDesc(42, 2, ts(3), 2)), + }, + exp: false, + }, } { t.Run(c.name, func(t *testing.T) { shouldFilter, err := c.p.shouldFilter(context.Background(), c.e)