Skip to content

Commit

Permalink
changefeed: detect regional by row changes in kv_feed
Browse files Browse the repository at this point in the history
This isn't strictly necessary as the schemafeed currently fails with
an unretriable error if it fails to validate the table descriptor and
regional by row tables will fail to validate.

However, it might be useful in general to be able to detect
unsupported schema migrations at this location since other schema
changes that we might want to disallow will not necessarily force a
table descriptor validation failure.

Release note: None
  • Loading branch information
stevendanna committed Apr 12, 2021
1 parent 54f6c6e commit a1bdf28
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 11 deletions.
30 changes: 29 additions & 1 deletion pkg/ccl/changefeedccl/kvfeed/kv_feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,15 @@ func (e schemaChangeDetectedError) Error() string {
return fmt.Sprintf("schema change detected at %v", e.ts)
}

type unsupportedSchemaChangeDetected struct {
desc string
ts hlc.Timestamp
}

func (e unsupportedSchemaChangeDetected) Error() string {
return fmt.Sprintf("unsupported schema change %s detected at %s", e.desc, e.ts.AsOfSystemTime())
}

type schemaFeed interface {
Peek(ctx context.Context, atOrBefore hlc.Timestamp) (events []schemafeed.TableEvent, err error)
Pop(ctx context.Context, atOrBefore hlc.Timestamp) (events []schemafeed.TableEvent, err error)
Expand Down Expand Up @@ -214,7 +223,17 @@ func (f *kvFeed) run(ctx context.Context) (err error) {
boundaryType := jobspb.ResolvedSpan_BACKFILL
if f.schemaChangePolicy == changefeedbase.OptSchemaChangePolicyStop {
boundaryType = jobspb.ResolvedSpan_EXIT
} else if events, err := f.tableFeed.Peek(ctx, highWater.Next()); err == nil && isPrimaryKeyChange(events) {
} else if events, err := f.tableFeed.Peek(ctx, highWater.Next()); err == nil && isRegionalByRowChange(events) {
// NOTE(ssd): The user is unlikely to see this
// error. The schemafeed will fail with an
// non-retriable error, meaning we likely
// return right after runUntilTableEvent
// above.
return unsupportedSchemaChangeDetected{
desc: "SET REGIONAL BY ROW",
ts: highWater.Next(),
}
} else if err == nil && isPrimaryKeyChange(events) {
boundaryType = jobspb.ResolvedSpan_RESTART
} else if err != nil {
return err
Expand Down Expand Up @@ -245,6 +264,15 @@ func isPrimaryKeyChange(events []schemafeed.TableEvent) bool {
return false
}

func isRegionalByRowChange(events []schemafeed.TableEvent) bool {
for _, ev := range events {
if schemafeed.IsRegionalByRowChange(ev) {
return true
}
}
return false
}

func (f *kvFeed) scanIfShould(
ctx context.Context, initialScan bool, highWater hlc.Timestamp,
) error {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,17 @@ func MakeColumnDesc(id descpb.ColumnID) *descpb.ColumnDescriptor {
}
}

// SetLocalityRegionalByRow sets the LocalityConfig of the table
// descriptor such that desc.IsLocalityRegionalByRow will return true.
func SetLocalityRegionalByRow(desc catalog.TableDescriptor) catalog.TableDescriptor {
desc.TableDesc().LocalityConfig = &descpb.TableDescriptor_LocalityConfig{
Locality: &descpb.TableDescriptor_LocalityConfig_RegionalByRow_{
RegionalByRow: &descpb.TableDescriptor_LocalityConfig_RegionalByRow{},
},
}
return tabledesc.NewBuilder(desc.TableDesc()).BuildImmutableTable()
}

// AddColumnDropBackfillMutation adds a mutation to desc to drop a column.
// Yes, this does modify an immutable.
func AddColumnDropBackfillMutation(desc catalog.TableDescriptor) catalog.TableDescriptor {
Expand Down
38 changes: 28 additions & 10 deletions pkg/ccl/changefeedccl/schemafeed/table_event_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,23 +25,26 @@ const (
tableEventTypeDropColumn
tableEventTruncate
tableEventPrimaryKeyChange
tableEventLocalityRegionalByRowChange
)

var (
defaultTableEventFilter = tableEventFilter{
tableEventTypeDropColumn: false,
tableEventTypeAddColumnWithBackfill: false,
tableEventTypeAddColumnNoBackfill: true,
tableEventTypeUnknown: true,
tableEventPrimaryKeyChange: false,
tableEventTypeDropColumn: false,
tableEventTypeAddColumnWithBackfill: false,
tableEventTypeAddColumnNoBackfill: true,
tableEventTypeUnknown: true,
tableEventPrimaryKeyChange: false,
tableEventLocalityRegionalByRowChange: false,
}

columnChangeTableEventFilter = tableEventFilter{
tableEventTypeDropColumn: false,
tableEventTypeAddColumnWithBackfill: false,
tableEventTypeAddColumnNoBackfill: false,
tableEventTypeUnknown: true,
tableEventPrimaryKeyChange: false,
tableEventTypeDropColumn: false,
tableEventTypeAddColumnWithBackfill: false,
tableEventTypeAddColumnNoBackfill: false,
tableEventTypeUnknown: true,
tableEventPrimaryKeyChange: false,
tableEventLocalityRegionalByRowChange: false,
}

schemaChangeEventFilters = map[changefeedbase.SchemaChangeEventClass]tableEventFilter{
Expand All @@ -52,6 +55,11 @@ var (

func classifyTableEvent(e TableEvent) tableEventType {
switch {
// Take care before changing the ordering here. Until we can
// classify events with multiple types, we need to ensure that
// we detect regionalByRow changes with priority.
case regionalByRowChanged(e):
return tableEventLocalityRegionalByRowChange
case newColumnBackfillComplete(e):
return tableEventTypeAddColumnWithBackfill
case newColumnNoBackfill(e):
Expand Down Expand Up @@ -135,8 +143,18 @@ func primaryKeyChanged(e TableEvent) bool {
pkChangeMutationExists(e.Before)
}

func regionalByRowChanged(e TableEvent) bool {
return e.Before.IsLocalityRegionalByRow() != e.After.IsLocalityRegionalByRow()
}

// IsPrimaryIndexChange returns true if the event corresponds to a change
// in the primary index.
func IsPrimaryIndexChange(e TableEvent) bool {
return classifyTableEvent(e) == tableEventPrimaryKeyChange
}

// IsRegionalByRowChange returns true if the event corresponds to a
// change in the table's locality to or from RegionalByRow.
func IsRegionalByRowChange(e TableEvent) bool {
return classifyTableEvent(e) == tableEventLocalityRegionalByRowChange
}
55 changes: 55 additions & 0 deletions pkg/ccl/changefeedccl/schemafeed/table_event_filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,50 @@ import (
"github.com/stretchr/testify/require"
)

func TestTableEventIsRegionalByRowChange(t *testing.T) {
ts := func(seconds int) hlc.Timestamp {
return hlc.Timestamp{WallTime: (time.Duration(seconds) * time.Second).Nanoseconds()}
}
mkTableDesc := schematestutils.MakeTableDesc
addColBackfill := schematestutils.AddNewColumnBackfillMutation
setRBR := schematestutils.SetLocalityRegionalByRow
for _, c := range []struct {
name string
e TableEvent
exp bool
}{
{
name: "regional by row change",
e: TableEvent{
Before: mkTableDesc(42, 1, ts(2), 2),
After: setRBR(mkTableDesc(42, 2, ts(3), 2)),
},
exp: true,
},
{
name: "add non-NULL column",
e: TableEvent{
Before: addColBackfill(mkTableDesc(42, 3, ts(2), 1)),
After: mkTableDesc(42, 4, ts(4), 2),
},
exp: false,
},
} {
t.Run(c.name, func(t *testing.T) {
require.Equal(t, c.exp, IsRegionalByRowChange(c.e))
})
}

}

func TestTableEventFilter(t *testing.T) {
ts := func(seconds int) hlc.Timestamp {
return hlc.Timestamp{WallTime: (time.Duration(seconds) * time.Second).Nanoseconds()}
}
mkTableDesc := schematestutils.MakeTableDesc
addColBackfill := schematestutils.AddNewColumnBackfillMutation
dropColBackfill := schematestutils.AddColumnDropBackfillMutation
setRBR := schematestutils.SetLocalityRegionalByRow
for _, c := range []struct {
name string
p tableEventFilter
Expand Down Expand Up @@ -94,6 +131,15 @@ func TestTableEventFilter(t *testing.T) {
},
exp: true,
},
{
name: "don't filter regional by row change",
p: defaultTableEventFilter,
e: TableEvent{
Before: mkTableDesc(42, 1, ts(2), 2),
After: setRBR(mkTableDesc(42, 2, ts(3), 2)),
},
exp: false,
},
{
name: "columnChange - don't filter end of add NULL column",
p: columnChangeTableEventFilter,
Expand All @@ -103,6 +149,15 @@ func TestTableEventFilter(t *testing.T) {
},
exp: false,
},
{
name: "columnChange - don't filter regional by row change",
p: columnChangeTableEventFilter,
e: TableEvent{
Before: mkTableDesc(42, 1, ts(2), 2),
After: setRBR(mkTableDesc(42, 2, ts(3), 2)),
},
exp: false,
},
} {
t.Run(c.name, func(t *testing.T) {
shouldFilter, err := c.p.shouldFilter(context.Background(), c.e)
Expand Down

0 comments on commit a1bdf28

Please sign in to comment.