Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sql: implement ADD/DROP schema changes for row-level TTL #76216

Merged
merged 8 commits into from
Feb 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
690 changes: 407 additions & 283 deletions pkg/sql/alter_table.go

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions pkg/sql/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ func (sc *SchemaChanger) runBackfill(ctx context.Context) error {
}
} else if mvRefresh := m.AsMaterializedViewRefresh(); mvRefresh != nil {
viewToRefresh = mvRefresh
} else if m.AsPrimaryKeySwap() != nil || m.AsComputedColumnSwap() != nil {
} else if m.AsPrimaryKeySwap() != nil || m.AsComputedColumnSwap() != nil || m.AsModifyRowLevelTTL() != nil {
// The backfiller doesn't need to do anything here.
} else {
return errors.AssertionFailedf("unsupported mutation: %+v", m)
Expand All @@ -280,7 +280,7 @@ func (sc *SchemaChanger) runBackfill(ctx context.Context) error {
// no-op. Handled in (*schemaChanger).done by queueing an index gc job.
} else if c := m.AsConstraint(); c != nil {
constraintsToDrop = append(constraintsToDrop, c)
} else if m.AsPrimaryKeySwap() != nil || m.AsComputedColumnSwap() != nil || m.AsMaterializedViewRefresh() != nil {
} else if m.AsPrimaryKeySwap() != nil || m.AsComputedColumnSwap() != nil || m.AsMaterializedViewRefresh() != nil || m.AsModifyRowLevelTTL() != nil {
// The backfiller doesn't need to do anything here.
} else {
return errors.AssertionFailedf("unsupported mutation: %+v", m)
Expand Down Expand Up @@ -1962,7 +1962,7 @@ func runSchemaChangesInTxn(
immutDesc := tabledesc.NewBuilder(tableDesc.TableDesc()).BuildImmutableTable()

if m.Adding() {
if m.AsPrimaryKeySwap() != nil {
if m.AsPrimaryKeySwap() != nil || m.AsModifyRowLevelTTL() != nil {
// Don't need to do anything here, as the call to MakeMutationComplete
// will perform the steps for this operation.
} else if m.AsComputedColumnSwap() != nil {
Expand Down
16 changes: 16 additions & 0 deletions pkg/sql/catalog/catpb/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -189,3 +189,19 @@ message PartitioningDescriptor {
repeated List list = 2 [(gogoproto.nullable) = false];
repeated Range range = 3 [(gogoproto.nullable) = false];
}

// RowLevelTTL represents the TTL configured on a table.
message RowLevelTTL {
option (gogoproto.equal) = true;

// DurationExpr is the automatically assigned interval for when the TTL should apply to a row.
optional string duration_expr = 1 [(gogoproto.nullable)=false];
// SelectBatchSize is the amount of rows that should be fetched at a time
optional int64 select_batch_size = 2 [(gogoproto.nullable)=false];
// DeleteBatchSize is the amount of rows that should be deleted at a time.
optional int64 delete_batch_size = 3 [(gogoproto.nullable)=false];
// DeletionCron signifies how often the TTL deletion job runs in a cron format.
optional string deletion_cron = 4 [(gogoproto.nullable)=false];
// ScheduleID is the ID of the row-level TTL job schedules.
optional int64 schedule_id = 5 [(gogoproto.customname)="ScheduleID",(gogoproto.nullable)=false];
}
25 changes: 10 additions & 15 deletions pkg/sql/catalog/descpb/structured.proto
Original file line number Diff line number Diff line change
Expand Up @@ -569,6 +569,14 @@ message PrimaryKeySwap {
optional LocalityConfigSwap locality_config_swap = 6;
}

// ModifyRowLevelTTL is a mutation corresponding to adding or dropping a TTL
// from a table. This accompanies an ADD or DROP column.
message ModifyRowLevelTTL {
option (gogoproto.equal) = true;

optional cockroach.sql.catalog.catpb.RowLevelTTL row_level_ttl = 1 [(gogoproto.customname) = "RowLevelTTL"];
}

// ComputedColumnSwap is a mutation corresponding to the atomic swap phase
// where Column a' that is computed using Column a is swapped to replace
// Column a while Column a becomes computed using a'.
Expand Down Expand Up @@ -618,6 +626,7 @@ message DescriptorMutation {
PrimaryKeySwap primaryKeySwap = 9;
ComputedColumnSwap computedColumnSwap = 10;
MaterializedViewRefresh materializedViewRefresh = 11;
ModifyRowLevelTTL modify_row_level_ttl = 12 [(gogoproto.customname)="ModifyRowLevelTTL"];
}
// A descriptor within a mutation is unavailable for reads, writes
// and deletes. It is only available for implicit (internal to
Expand Down Expand Up @@ -1170,22 +1179,8 @@ message TableDescriptor {
// from the PARTITION ALL BY clause.
optional bool partition_all_by = 44 [(gogoproto.nullable)=false];

message RowLevelTTL {
option (gogoproto.equal) = true;

// DurationExpr is the automatically assigned interval for when the TTL should apply to a row.
optional string duration_expr = 1 [(gogoproto.nullable)=false];
// SelectBatchSize is the amount of rows that should be fetched at a time
optional int64 select_batch_size = 2 [(gogoproto.nullable)=false];
// DeleteBatchSize is the amount of rows that should be deleted at a time.
optional int64 delete_batch_size = 3 [(gogoproto.nullable)=false];
// DeletionCron signifies how often the TTL deletion job runs in a cron format.
optional string deletion_cron = 4 [(gogoproto.nullable)=false];
// ScheduleID is the ID of the row-level TTL job schedules.
optional int64 schedule_id = 5 [(gogoproto.customname)="ScheduleID",(gogoproto.nullable)=false];
}
// RowLevelTTL is set if there is a TTL set on the table.
optional RowLevelTTL row_level_ttl = 47 [(gogoproto.customname)="RowLevelTTL"];
optional cockroach.sql.catalog.catpb.RowLevelTTL row_level_ttl = 47 [(gogoproto.customname)="RowLevelTTL"];

// ExcludeDataFromBackup specifies if the table's row data can be excluded
// from a backup targeting this table. This in turn means that the protected
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/catalog/descriptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -653,7 +653,7 @@ type TableDescriptor interface {
// REGIONAL BY ROW table.
GetRegionalByRowTableRegionColumnName() (tree.Name, error)
// GetRowLevelTTL returns the row-level TTL config for the table.
GetRowLevelTTL() *descpb.TableDescriptor_RowLevelTTL
GetRowLevelTTL() *catpb.RowLevelTTL
// HasRowLevelTTL returns where there is a row-level TTL config for the table.
HasRowLevelTTL() bool
// GetExcludeDataFromBackup returns true if the table's row data is configured
Expand Down
12 changes: 12 additions & 0 deletions pkg/sql/catalog/table_elements.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ type Mutation interface {
// if the mutation is a materialized view refresh, nil otherwise.
AsMaterializedViewRefresh() MaterializedViewRefresh

// AsModifyRowLevelTTL returns the corresponding ModifyRowLevelTTL
// if the mutation is a row-level TTL alter, nil otherwise.
AsModifyRowLevelTTL() ModifyRowLevelTTL

// NOTE: When adding new types of mutations to this interface, be sure to
// audit the code which unpacks and introspects mutations to be sure to add
// cases for the new type.
Expand Down Expand Up @@ -459,6 +463,14 @@ type MaterializedViewRefresh interface {
TableWithNewIndexes(tbl TableDescriptor) TableDescriptor
}

// ModifyRowLevelTTL is an interface around a modify row level TTL mutation.
type ModifyRowLevelTTL interface {
TableElementMaybeMutation

// RowLevelTTL returns the row level TTL for the mutation.
RowLevelTTL() *catpb.RowLevelTTL
}

// Partitioning is an interface around an index partitioning.
type Partitioning interface {

Expand Down
40 changes: 33 additions & 7 deletions pkg/sql/catalog/tabledesc/mutation.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package tabledesc

import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/iterutil"
Expand Down Expand Up @@ -158,6 +159,17 @@ func (c constraintToUpdate) GetConstraintID() descpb.ConstraintID {
panic("unknown constraint type")
}

// modifyRowLevelTTL implements the catalog.ModifyRowLevelTTL interface.
type modifyRowLevelTTL struct {
maybeMutation
desc *descpb.ModifyRowLevelTTL
}

// RowLevelTTL contains the row level TTL config to add or remove.
func (c modifyRowLevelTTL) RowLevelTTL() *catpb.RowLevelTTL {
return c.desc.RowLevelTTL
}

// primaryKeySwap implements the catalog.PrimaryKeySwap interface.
type primaryKeySwap struct {
maybeMutation
Expand Down Expand Up @@ -292,13 +304,14 @@ func (c materializedViewRefresh) TableWithNewIndexes(
// mutation implements the
type mutation struct {
maybeMutation
column catalog.Column
index catalog.Index
constraint catalog.ConstraintToUpdate
pkSwap catalog.PrimaryKeySwap
ccSwap catalog.ComputedColumnSwap
mvRefresh catalog.MaterializedViewRefresh
mutationOrdinal int
column catalog.Column
index catalog.Index
constraint catalog.ConstraintToUpdate
pkSwap catalog.PrimaryKeySwap
ccSwap catalog.ComputedColumnSwap
mvRefresh catalog.MaterializedViewRefresh
modifyRowLevelTTL catalog.ModifyRowLevelTTL
mutationOrdinal int
}

// AsColumn returns the corresponding Column if the mutation is on a column,
Expand All @@ -325,6 +338,12 @@ func (m mutation) AsPrimaryKeySwap() catalog.PrimaryKeySwap {
return m.pkSwap
}

// AsModifyRowLevelTTL returns the corresponding ModifyRowLevelTTL if the
// mutation is a computed column swap, nil otherwise.
func (m mutation) AsModifyRowLevelTTL() catalog.ModifyRowLevelTTL {
return m.modifyRowLevelTTL
}

// AsComputedColumnSwap returns the corresponding ComputedColumnSwap if the
// mutation is a computed column swap, nil otherwise.
func (m mutation) AsComputedColumnSwap() catalog.ComputedColumnSwap {
Expand Down Expand Up @@ -366,6 +385,7 @@ func newMutationCache(desc *descpb.TableDescriptor) *mutationCache {
var pkSwaps []primaryKeySwap
var ccSwaps []computedColumnSwap
var mvRefreshes []materializedViewRefresh
var modifyRowLevelTTLs []modifyRowLevelTTL
for i, m := range desc.Mutations {
mm := maybeMutation{
mutationID: m.MutationID,
Expand Down Expand Up @@ -415,6 +435,12 @@ func newMutationCache(desc *descpb.TableDescriptor) *mutationCache {
desc: pb,
})
backingStructs[i].mvRefresh = &mvRefreshes[len(mvRefreshes)-1]
} else if pb := m.GetModifyRowLevelTTL(); pb != nil {
modifyRowLevelTTLs = append(modifyRowLevelTTLs, modifyRowLevelTTL{
maybeMutation: mm,
desc: pb,
})
backingStructs[i].modifyRowLevelTTL = &modifyRowLevelTTLs[len(modifyRowLevelTTLs)-1]
}
}
// Populate the c.all slice with Mutation interfaces.
Expand Down
13 changes: 12 additions & 1 deletion pkg/sql/catalog/tabledesc/structured.go
Original file line number Diff line number Diff line change
Expand Up @@ -1985,6 +1985,17 @@ func (desc *Mutable) AddNotNullMutation(
desc.addMutation(m)
}

// AddModifyRowLevelTTLMutation adds a row-level TTL mutation to descs.Mutations.
func (desc *Mutable) AddModifyRowLevelTTLMutation(
ttl *descpb.ModifyRowLevelTTL, direction descpb.DescriptorMutation_Direction,
) {
m := descpb.DescriptorMutation{
Descriptor_: &descpb.DescriptorMutation_ModifyRowLevelTTL{ModifyRowLevelTTL: ttl},
Direction: direction,
}
desc.addMutation(m)
}

// AddColumnMutation adds a column mutation to desc.Mutations. Callers must take
// care not to further mutate the column descriptor, since this method retains
// a pointer to it.
Expand Down Expand Up @@ -2400,7 +2411,7 @@ func (desc *wrapper) GetRegionalByRowTableRegionColumnName() (tree.Name, error)
}

// GetRowLevelTTL implements the TableDescriptor interface.
func (desc *wrapper) GetRowLevelTTL() *descpb.TableDescriptor_RowLevelTTL {
func (desc *wrapper) GetRowLevelTTL() *catpb.RowLevelTTL {
return desc.RowLevelTTL
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/catalog/tabledesc/ttl.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@
package tabledesc

import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catpb"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/robfig/cron/v3"
)

// ValidateRowLevelTTL validates that the TTL options are valid.
func ValidateRowLevelTTL(ttl *descpb.TableDescriptor_RowLevelTTL) error {
func ValidateRowLevelTTL(ttl *catpb.RowLevelTTL) error {
if ttl == nil {
return nil
}
Expand Down
29 changes: 27 additions & 2 deletions pkg/sql/catalog/tabledesc/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,11 @@ func validateMutation(m *descpb.DescriptorMutation) error {
return errors.AssertionFailedf(
"materialized view refresh mutation in state %s, direction %s", errors.Safe(m.State), errors.Safe(m.Direction))
}
case *descpb.DescriptorMutation_ModifyRowLevelTTL:
if m.Direction == descpb.DescriptorMutation_NONE {
return errors.AssertionFailedf(
"modify row level TTL mutation in state %s, direction %s", errors.Safe(m.State), errors.Safe(m.Direction))
}
default:
return errors.AssertionFailedf(
"mutation in state %s, direction %s, and no column/index descriptor",
Expand Down Expand Up @@ -548,13 +553,15 @@ func (desc *wrapper) ValidateSelf(vea catalog.ValidationErrorAccumulator) {
desc.validateConstraintIDs(vea)
}

// Ensure that mutations cannot be queued if a primary key change or
// an alter column type schema change has either been started in
// Ensure that mutations cannot be queued if a primary key change, TTL change
// or an alter column type schema change has either been started in
// this transaction, or is currently in progress.
var alterPKMutation descpb.MutationID
var alterColumnTypeMutation descpb.MutationID
var modifyTTLMutation descpb.MutationID
var foundAlterPK bool
var foundAlterColumnType bool
var foundModifyTTL bool

for _, m := range desc.Mutations {
// If we have seen an alter primary key mutation, then
Expand Down Expand Up @@ -587,6 +594,20 @@ func (desc *wrapper) ValidateSelf(vea catalog.ValidationErrorAccumulator) {
}
return
}
if foundModifyTTL {
if modifyTTLMutation == m.MutationID {
vea.Report(pgerror.Newf(
pgcode.FeatureNotSupported,
"cannot perform other schema changes in the same transaction as a TTL mutation",
))
} else {
vea.Report(pgerror.Newf(
pgcode.FeatureNotSupported,
"cannot perform a schema change operation while a TTL change is in progress",
))
}
return
}
if m.GetPrimaryKeySwap() != nil {
foundAlterPK = true
alterPKMutation = m.MutationID
Expand All @@ -595,6 +616,10 @@ func (desc *wrapper) ValidateSelf(vea catalog.ValidationErrorAccumulator) {
foundAlterColumnType = true
alterColumnTypeMutation = m.MutationID
}
if m.GetModifyRowLevelTTL() != nil {
foundModifyTTL = true
modifyTTLMutation = m.MutationID
}
}

// Validate that the presence of MutationJobs (from the old schema changer)
Expand Down
15 changes: 9 additions & 6 deletions pkg/sql/control_schedules.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"fmt"

"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/scheduledjobs"
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/server/telemetry"
Expand Down Expand Up @@ -97,12 +98,14 @@ func updateSchedule(params runParams, schedule *jobs.ScheduledJob) error {
}

// deleteSchedule deletes specified schedule.
func deleteSchedule(params runParams, scheduleID int64) error {
env := JobSchedulerEnv(params.ExecCfg())
_, err := params.ExecCfg().InternalExecutor.ExecEx(
params.ctx,
func deleteSchedule(
ctx context.Context, execCfg *ExecutorConfig, txn *kv.Txn, scheduleID int64,
) error {
env := JobSchedulerEnv(execCfg)
_, err := execCfg.InternalExecutor.ExecEx(
ctx,
"delete-schedule",
params.EvalContext().Txn,
txn,
sessiondata.InternalExecutorOverride{User: security.RootUserName()},
fmt.Sprintf(
"DELETE FROM %s WHERE schedule_id = $1",
Expand Down Expand Up @@ -161,7 +164,7 @@ func (n *controlSchedulesNode) startExec(params runParams) error {
return errors.Wrap(err, "failed to run OnDrop")
}
}
err = deleteSchedule(params, schedule.ScheduleID())
err = deleteSchedule(params.ctx, params.ExecCfg(), params.p.txn, schedule.ScheduleID())
default:
err = errors.AssertionFailedf("unhandled command %s", n.command)
}
Expand Down
Loading