Skip to content

Commit

Permalink
Merge #63306
Browse files Browse the repository at this point in the history
63306: changefeedccl: disallow changfeeds on regional by row tables r=miretskiy a=stevendanna

To fully support changefeeds, we need to make a number of technical-
and product-level decisions and changes. Until those changes are made,
we want to disable changefeeds on regional by row tables.

This change ads a new check in the table validator that will prevent
new changefeeds from being started on a table where
IsLocalityRegionalByRow() is true.

Further, existing feeds will fails with a non-retriable error.

Fixes #63239

Release note (enterprise change): Changefeeds will now fail on any
regional by row table with an error such as:

    CHANGEFEED cannot target REGIONAL BY ROW tables: TABLE_NAME

This is to prevent unexpected changefeed behavior until we can offer
full support for regional by row tables.

Co-authored-by: Steven Danna <[email protected]>
  • Loading branch information
craig[bot] and stevendanna committed Apr 13, 2021
2 parents cf88523 + a1bdf28 commit 650ede7
Show file tree
Hide file tree
Showing 8 changed files with 222 additions and 16 deletions.
2 changes: 2 additions & 0 deletions pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ go_test(
"//pkg/ccl/changefeedccl/changefeedbase",
"//pkg/ccl/changefeedccl/kvfeed",
"//pkg/ccl/importccl",
"//pkg/ccl/multiregionccl",
"//pkg/ccl/partitionccl",
"//pkg/ccl/storageccl",
"//pkg/ccl/utilccl",
"//pkg/clusterversion",
Expand Down
73 changes: 72 additions & 1 deletion pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ import (
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdctest"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
// Imported to allow locality-related table mutations
_ "github.com/cockroachdb/cockroach/pkg/ccl/multiregionccl"
_ "github.com/cockroachdb/cockroach/pkg/ccl/partitionccl"
"github.com/cockroachdb/cockroach/pkg/ccl/utilccl"
"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/jobs"
Expand Down Expand Up @@ -67,6 +70,8 @@ import (
"github.com/stretchr/testify/require"
)

var testServerRegion = "us-east-1"

func TestChangefeedBasics(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down Expand Up @@ -1175,6 +1180,55 @@ func TestChangefeedAuthorization(t *testing.T) {
t.Run(`enterprise`, enterpriseTest(testFn))
}

func TestChangefeedFailOnRBRChange(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

rbrErrorRegex := regexp.MustCompile(`CHANGEFEED cannot target REGIONAL BY ROW tables: rbr`)
assertRBRError := func(ctx context.Context, f cdctest.TestFeed) {
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
done := make(chan struct{})
go func() {
if _, err := f.Next(); err != nil {
assert.Regexp(t, rbrErrorRegex, err)
done <- struct{}{}
}
}()
select {
case <-ctx.Done():
t.Fatal("timed out waiting for changefeed to fail")
case <-done:
}
}
testFn := func(t *testing.T, db *gosql.DB, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(db)
sqlDB.Exec(t, "SET CLUSTER SETTING kv.closed_timestamp.target_duration = '50ms'")
t.Run("regional by row", func(t *testing.T) {
sqlDB.Exec(t, `CREATE TABLE rbr (a INT PRIMARY KEY, b INT)`)
defer sqlDB.Exec(t, `DROP TABLE rbr`)
sqlDB.Exec(t, `INSERT INTO rbr VALUES (0, NULL)`)
rbr := feed(t, f, `CREATE CHANGEFEED FOR rbr `)
defer closeFeed(t, rbr)
sqlDB.Exec(t, `INSERT INTO rbr VALUES (1, 2)`)
assertPayloads(t, rbr, []string{
`rbr: [0]->{"after": {"a": 0, "b": null}}`,
`rbr: [1]->{"after": {"a": 1, "b": 2}}`,
})
sqlDB.Exec(t, `ALTER TABLE rbr SET LOCALITY REGIONAL BY ROW`)
assertRBRError(context.Background(), rbr)
})
}
withTestServerRegion := func(args *base.TestServerArgs) {
args.Locality.Tiers = append(args.Locality.Tiers, roachpb.Tier{
Key: "region",
Value: testServerRegion,
})
}
t.Run(`sinkless`, sinklessTestWithServerArgs(withTestServerRegion, testFn))
t.Run("enterprise", enterpriseTestWithServerArgs(withTestServerRegion, testFn))
}

func TestChangefeedStopOnSchemaChange(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down Expand Up @@ -1963,7 +2017,14 @@ func TestChangefeedErrors(t *testing.T) {
defer log.Scope(t).Close(t)

ctx := context.Background()
s, db, _ := serverutils.StartServer(t, base.TestServerArgs{})
s, db, _ := serverutils.StartServer(t, base.TestServerArgs{
Locality: roachpb.Locality{
Tiers: []roachpb.Tier{{
Key: "region",
Value: testServerRegion,
}},
},
})
defer s.Stopper().Stop(ctx)
sqlDB := sqlutils.MakeSQLRunner(db)
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b STRING)`)
Expand Down Expand Up @@ -2043,6 +2104,16 @@ func TestChangefeedErrors(t *testing.T) {
t, `CHANGEFEED cannot target views: vw`,
`EXPERIMENTAL CHANGEFEED FOR vw`,
)

// Regional by row tables are not currently supported
sqlDB.Exec(t, fmt.Sprintf(`ALTER DATABASE defaultdb PRIMARY REGION "%s"`, testServerRegion))
sqlDB.Exec(t, `CREATE TABLE test_cdc_rbr_fails (a INT PRIMARY KEY)`)
sqlDB.Exec(t, `ALTER TABLE test_cdc_rbr_fails SET LOCALITY REGIONAL BY ROW`)
sqlDB.ExpectErr(
t, `CHANGEFEED cannot target REGIONAL BY ROW tables: test_cdc_rbr_fails`,
`CREATE CHANGEFEED FOR test_cdc_rbr_fails`,
)

// Backup has the same bad error message #28170.
sqlDB.ExpectErr(
t, `"information_schema.tables" does not exist`,
Expand Down
3 changes: 3 additions & 0 deletions pkg/ccl/changefeedccl/changefeedbase/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ func ValidateTable(targets jobspb.ChangefeedTargets, tableDesc catalog.TableDesc
if tableDesc.IsSequence() {
return errors.Errorf(`CHANGEFEED cannot target sequences: %s`, tableDesc.GetName())
}
if tableDesc.IsLocalityRegionalByRow() {
return errors.Errorf(`CHANGEFEED cannot target REGIONAL BY ROW tables: %s`, tableDesc.GetName())
}
if len(tableDesc.GetFamilies()) != 1 {
return errors.Errorf(
`CHANGEFEEDs are currently supported on tables with exactly 1 column family: %s has %d`,
Expand Down
24 changes: 21 additions & 3 deletions pkg/ccl/changefeedccl/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ import (
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdctest"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
// Imported to allow locality-related table mutations
_ "github.com/cockroachdb/cockroach/pkg/ccl/multiregionccl"
_ "github.com/cockroachdb/cockroach/pkg/ccl/partitionccl"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
Expand Down Expand Up @@ -246,7 +249,7 @@ func expectResolvedTimestampAvro(
return parseTimeToHLC(t, resolved.(map[string]interface{})[`string`].(string))
}

func sinlesttTestWithServerArgs(
func sinklessTestWithServerArgs(
argsFn func(args *base.TestServerArgs),
testFn func(*testing.T, *gosql.DB, cdctest.TestFeedFactory),
) func(*testing.T) {
Expand Down Expand Up @@ -280,7 +283,9 @@ func sinlesttTestWithServerArgs(
// that we'll still use the row-by-row engine, see #55605).
sqlDB.Exec(t, `SET CLUSTER SETTING sql.defaults.vectorize=on`)
sqlDB.Exec(t, `CREATE DATABASE d`)

if region := serverArgsRegion(args); region != "" {
sqlDB.Exec(t, fmt.Sprintf(`ALTER DATABASE d PRIMARY REGION "%s"`, region))
}
sink, cleanup := sqlutils.PGUrl(t, s.ServingSQLAddr(), t.Name(), url.User(security.RootUser))
defer cleanup()
f := cdctest.MakeSinklessFeedFactory(s, sink)
Expand All @@ -289,7 +294,7 @@ func sinlesttTestWithServerArgs(
}

func sinklessTest(testFn func(*testing.T, *gosql.DB, cdctest.TestFeedFactory)) func(*testing.T) {
return sinlesttTestWithServerArgs(nil, testFn)
return sinklessTestWithServerArgs(nil, testFn)
}

func enterpriseTest(testFn func(*testing.T, *gosql.DB, cdctest.TestFeedFactory)) func(*testing.T) {
Expand Down Expand Up @@ -330,6 +335,10 @@ func enterpriseTestWithServerArgs(
sqlDB.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.target_duration = '1s'`)
sqlDB.Exec(t, `SET CLUSTER SETTING changefeed.experimental_poll_interval = '10ms'`)
sqlDB.Exec(t, `CREATE DATABASE d`)

if region := serverArgsRegion(args); region != "" {
sqlDB.Exec(t, fmt.Sprintf(`ALTER DATABASE d PRIMARY REGION "%s"`, region))
}
sink, cleanup := sqlutils.PGUrl(t, s.ServingSQLAddr(), t.Name(), url.User(security.RootUser))
defer cleanup()
f := cdctest.MakeTableFeedFactory(s, db, flushCh, sink)
Expand All @@ -338,6 +347,15 @@ func enterpriseTestWithServerArgs(
}
}

func serverArgsRegion(args base.TestServerArgs) string {
for _, tier := range args.Locality.Tiers {
if tier.Key == "region" {
return tier.Value
}
}
return ""
}

func cloudStorageTest(
testFn func(*testing.T, *gosql.DB, cdctest.TestFeedFactory),
) func(*testing.T) {
Expand Down
32 changes: 30 additions & 2 deletions pkg/ccl/changefeedccl/kvfeed/kv_feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,16 @@ type schemaChangeDetectedError struct {
}

func (e schemaChangeDetectedError) Error() string {
return fmt.Sprintf("schema change deteceted at %v", e.ts)
return fmt.Sprintf("schema change detected at %v", e.ts)
}

type unsupportedSchemaChangeDetected struct {
desc string
ts hlc.Timestamp
}

func (e unsupportedSchemaChangeDetected) Error() string {
return fmt.Sprintf("unsupported schema change %s detected at %s", e.desc, e.ts.AsOfSystemTime())
}

type schemaFeed interface {
Expand Down Expand Up @@ -214,7 +223,17 @@ func (f *kvFeed) run(ctx context.Context) (err error) {
boundaryType := jobspb.ResolvedSpan_BACKFILL
if f.schemaChangePolicy == changefeedbase.OptSchemaChangePolicyStop {
boundaryType = jobspb.ResolvedSpan_EXIT
} else if events, err := f.tableFeed.Peek(ctx, highWater.Next()); err == nil && isPrimaryKeyChange(events) {
} else if events, err := f.tableFeed.Peek(ctx, highWater.Next()); err == nil && isRegionalByRowChange(events) {
// NOTE(ssd): The user is unlikely to see this
// error. The schemafeed will fail with an
// non-retriable error, meaning we likely
// return right after runUntilTableEvent
// above.
return unsupportedSchemaChangeDetected{
desc: "SET REGIONAL BY ROW",
ts: highWater.Next(),
}
} else if err == nil && isPrimaryKeyChange(events) {
boundaryType = jobspb.ResolvedSpan_RESTART
} else if err != nil {
return err
Expand Down Expand Up @@ -245,6 +264,15 @@ func isPrimaryKeyChange(events []schemafeed.TableEvent) bool {
return false
}

func isRegionalByRowChange(events []schemafeed.TableEvent) bool {
for _, ev := range events {
if schemafeed.IsRegionalByRowChange(ev) {
return true
}
}
return false
}

func (f *kvFeed) scanIfShould(
ctx context.Context, initialScan bool, highWater hlc.Timestamp,
) error {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,17 @@ func MakeColumnDesc(id descpb.ColumnID) *descpb.ColumnDescriptor {
}
}

// SetLocalityRegionalByRow sets the LocalityConfig of the table
// descriptor such that desc.IsLocalityRegionalByRow will return true.
func SetLocalityRegionalByRow(desc catalog.TableDescriptor) catalog.TableDescriptor {
desc.TableDesc().LocalityConfig = &descpb.TableDescriptor_LocalityConfig{
Locality: &descpb.TableDescriptor_LocalityConfig_RegionalByRow_{
RegionalByRow: &descpb.TableDescriptor_LocalityConfig_RegionalByRow{},
},
}
return tabledesc.NewBuilder(desc.TableDesc()).BuildImmutableTable()
}

// AddColumnDropBackfillMutation adds a mutation to desc to drop a column.
// Yes, this does modify an immutable.
func AddColumnDropBackfillMutation(desc catalog.TableDescriptor) catalog.TableDescriptor {
Expand Down
38 changes: 28 additions & 10 deletions pkg/ccl/changefeedccl/schemafeed/table_event_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,23 +25,26 @@ const (
tableEventTypeDropColumn
tableEventTruncate
tableEventPrimaryKeyChange
tableEventLocalityRegionalByRowChange
)

var (
defaultTableEventFilter = tableEventFilter{
tableEventTypeDropColumn: false,
tableEventTypeAddColumnWithBackfill: false,
tableEventTypeAddColumnNoBackfill: true,
tableEventTypeUnknown: true,
tableEventPrimaryKeyChange: false,
tableEventTypeDropColumn: false,
tableEventTypeAddColumnWithBackfill: false,
tableEventTypeAddColumnNoBackfill: true,
tableEventTypeUnknown: true,
tableEventPrimaryKeyChange: false,
tableEventLocalityRegionalByRowChange: false,
}

columnChangeTableEventFilter = tableEventFilter{
tableEventTypeDropColumn: false,
tableEventTypeAddColumnWithBackfill: false,
tableEventTypeAddColumnNoBackfill: false,
tableEventTypeUnknown: true,
tableEventPrimaryKeyChange: false,
tableEventTypeDropColumn: false,
tableEventTypeAddColumnWithBackfill: false,
tableEventTypeAddColumnNoBackfill: false,
tableEventTypeUnknown: true,
tableEventPrimaryKeyChange: false,
tableEventLocalityRegionalByRowChange: false,
}

schemaChangeEventFilters = map[changefeedbase.SchemaChangeEventClass]tableEventFilter{
Expand All @@ -52,6 +55,11 @@ var (

func classifyTableEvent(e TableEvent) tableEventType {
switch {
// Take care before changing the ordering here. Until we can
// classify events with multiple types, we need to ensure that
// we detect regionalByRow changes with priority.
case regionalByRowChanged(e):
return tableEventLocalityRegionalByRowChange
case newColumnBackfillComplete(e):
return tableEventTypeAddColumnWithBackfill
case newColumnNoBackfill(e):
Expand Down Expand Up @@ -135,8 +143,18 @@ func primaryKeyChanged(e TableEvent) bool {
pkChangeMutationExists(e.Before)
}

func regionalByRowChanged(e TableEvent) bool {
return e.Before.IsLocalityRegionalByRow() != e.After.IsLocalityRegionalByRow()
}

// IsPrimaryIndexChange returns true if the event corresponds to a change
// in the primary index.
func IsPrimaryIndexChange(e TableEvent) bool {
return classifyTableEvent(e) == tableEventPrimaryKeyChange
}

// IsRegionalByRowChange returns true if the event corresponds to a
// change in the table's locality to or from RegionalByRow.
func IsRegionalByRowChange(e TableEvent) bool {
return classifyTableEvent(e) == tableEventLocalityRegionalByRowChange
}
Loading

0 comments on commit 650ede7

Please sign in to comment.