Skip to content

Commit

Permalink
Merge #88497
Browse files Browse the repository at this point in the history
88497: ttl: refactor storage param Setter to not modify TableDescriptor TTL settings directly r=rafiss a=ecwall

refs #88254

Changes the storage param Setter to modify UpdatedRowLevelTTL instead of the TableDescriptor directly so that the update can more easily be moved into a schema changer mutation if necessary.

Release note: None

Co-authored-by: Evan Wall <[email protected]>
  • Loading branch information
craig[bot] and ecwall committed Sep 23, 2022
2 parents 8c2b247 + 3662a45 commit 97fbd44
Show file tree
Hide file tree
Showing 9 changed files with 170 additions and 144 deletions.
2 changes: 1 addition & 1 deletion pkg/sql/alter_column_type.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func AlterColumnType(
)
}
}
if err := schemaexpr.ValidateTTLExpressionDoesNotDependOnColumn(tableDesc, col); err != nil {
if err := schemaexpr.ValidateTTLExpressionDoesNotDependOnColumn(tableDesc, tableDesc.GetRowLevelTTL(), col); err != nil {
return err
}

Expand Down
108 changes: 55 additions & 53 deletions pkg/sql/alter_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -501,7 +501,7 @@ func (n *alterTableNode) startExec(params runParams) error {
}
}

colDroppedViews, err := dropColumnImpl(params, tn, n.tableDesc, t)
colDroppedViews, err := dropColumnImpl(params, tn, n.tableDesc, n.tableDesc.GetRowLevelTTL(), t)
if err != nil {
return err
}
Expand Down Expand Up @@ -735,51 +735,48 @@ func (n *alterTableNode) startExec(params runParams) error {
}

case *tree.AlterTableSetStorageParams:
var ttlBefore *catpb.RowLevelTTL
if ttl := n.tableDesc.GetRowLevelTTL(); ttl != nil {
ttlBefore = protoutil.Clone(ttl).(*catpb.RowLevelTTL)
}
setter := tablestorageparam.NewSetter(n.tableDesc)
if err := storageparam.Set(
params.p.SemaCtx(),
params.EvalContext(),
t.StorageParams,
tablestorageparam.NewSetter(n.tableDesc),
setter,
); err != nil {
return err
}

descriptorChanged = true

if err := handleTTLStorageParamChange(
var err error
err = handleTTLStorageParamChange(
params,
tn,
n.tableDesc,
ttlBefore,
n.tableDesc.GetRowLevelTTL(),
); err != nil {
setter.TableDesc,
setter.UpdatedRowLevelTTL,
)
if err != nil {
return err
}

case *tree.AlterTableResetStorageParams:
var ttlBefore *catpb.RowLevelTTL
if ttl := n.tableDesc.GetRowLevelTTL(); ttl != nil {
ttlBefore = protoutil.Clone(ttl).(*catpb.RowLevelTTL)
}
setter := tablestorageparam.NewSetter(n.tableDesc)
if err := storageparam.Reset(
params.EvalContext(),
t.Params,
tablestorageparam.NewSetter(n.tableDesc),
setter,
); err != nil {
return err
}
descriptorChanged = true

if err := handleTTLStorageParamChange(
var err error
err = handleTTLStorageParamChange(
params,
tn,
n.tableDesc,
ttlBefore,
n.tableDesc.GetRowLevelTTL(),
); err != nil {
setter.TableDesc,
setter.UpdatedRowLevelTTL,
)
if err != nil {
return err
}

Expand Down Expand Up @@ -1550,7 +1547,11 @@ func (p *planner) updateFKBackReferenceName(
}

func dropColumnImpl(
params runParams, tn *tree.TableName, tableDesc *tabledesc.Mutable, t *tree.AlterTableDropColumn,
params runParams,
tn *tree.TableName,
tableDesc *tabledesc.Mutable,
rowLevelTTL *catpb.RowLevelTTL,
t *tree.AlterTableDropColumn,
) (droppedViews []string, err error) {
if tableDesc.IsLocalityRegionalByRow() {
rbrColName, err := tableDesc.GetRegionalByRowTableRegionColumnName()
Expand Down Expand Up @@ -1658,7 +1659,7 @@ func dropColumnImpl(
if err := schemaexpr.ValidateColumnHasNoDependents(tableDesc, colToDrop); err != nil {
return nil, err
}
if err := schemaexpr.ValidateTTLExpressionDoesNotDependOnColumn(tableDesc, colToDrop); err != nil {
if err := schemaexpr.ValidateTTLExpressionDoesNotDependOnColumn(tableDesc, rowLevelTTL, colToDrop); err != nil {
return nil, err
}

Expand Down Expand Up @@ -1845,12 +1846,12 @@ func dropColumnImpl(
}

func handleTTLStorageParamChange(
params runParams,
tn *tree.TableName,
tableDesc *tabledesc.Mutable,
before, after *catpb.RowLevelTTL,
params runParams, tn *tree.TableName, tableDesc *tabledesc.Mutable, after *catpb.RowLevelTTL,
) error {
// update existing config

before := tableDesc.GetRowLevelTTL()

// Update existing config.
if before != nil && after != nil {

// Update cron schedule if required.
Expand Down Expand Up @@ -1910,11 +1911,12 @@ func handleTTLStorageParamChange(
}
}

// create new column
hasTTLMutation := false

// Create new column.
if (before == nil || !before.HasDurationExpr()) && (after != nil && after.HasDurationExpr()) {
// Adding a TTL requires adding the automatic column and deferring the TTL
// addition to after the column is successfully added.
tableDesc.RowLevelTTL = nil
if _, err := tableDesc.FindColumnWithName(colinfo.TTLDefaultExpirationColumnName); err == nil {
return pgerror.Newf(
pgcode.InvalidTableDefinition,
Expand Down Expand Up @@ -1947,45 +1949,45 @@ func handleTTLStorageParamChange(
&descpb.ModifyRowLevelTTL{RowLevelTTL: after},
descpb.DescriptorMutation_ADD,
)
hasTTLMutation = true
version := params.ExecCfg().Settings.Version.ActiveVersion(params.ctx)
if err := tableDesc.AllocateIDs(params.ctx, version); err != nil {
return err
}
}

// remove existing column
// Remove existing column.
if (before != nil && before.HasDurationExpr()) && (after == nil || !after.HasDurationExpr()) {
telemetry.Inc(sqltelemetry.RowLevelTTLDropped)

if before.HasDurationExpr() {
// Keep the TTL from beforehand, but create the DROP COLUMN job and the
// associated mutation.
droppedViews, err := dropColumnImpl(params, tn, tableDesc, &tree.AlterTableDropColumn{
Column: colinfo.TTLDefaultExpirationColumnName,
})
if err != nil {
return err
}
// This should never happen as we do not CASCADE, but error again just in case.
if len(droppedViews) > 0 {
return pgerror.Newf(pgcode.InvalidParameterValue, "cannot drop TTL automatic column if it is depended on by a view")
}
tableDesc.RowLevelTTL = before

tableDesc.AddModifyRowLevelTTLMutation(
&descpb.ModifyRowLevelTTL{RowLevelTTL: before},
descpb.DescriptorMutation_DROP,
)
// Create the DROP COLUMN job and the associated mutation.
droppedViews, err := dropColumnImpl(params, tn, tableDesc, after, &tree.AlterTableDropColumn{
Column: colinfo.TTLDefaultExpirationColumnName,
})
if err != nil {
return err
}
// This should never happen as we do not CASCADE, but error again just in case.
if len(droppedViews) > 0 {
return pgerror.Newf(pgcode.InvalidParameterValue, "cannot drop TTL automatic column if it is depended on by a view")
}
tableDesc.AddModifyRowLevelTTLMutation(
&descpb.ModifyRowLevelTTL{RowLevelTTL: before},
descpb.DescriptorMutation_DROP,
)
hasTTLMutation = true
}

// Validate the type and volatility of ttl_expiration_expression.
if after != nil && after.HasExpirationExpr() {
if err := schemaexpr.ValidateTTLExpirationExpression(params.ctx, tableDesc, params.p.SemaCtx(), tn); err != nil {
if after != nil {
if err := schemaexpr.ValidateTTLExpirationExpression(params.ctx, tableDesc, params.p.SemaCtx(), tn, after); err != nil {
return err
}
}

if !hasTTLMutation {
tableDesc.RowLevelTTL = after
}

return nil
}

Expand Down
1 change: 1 addition & 0 deletions pkg/sql/catalog/schemaexpr/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/sql/catalog",
"//pkg/sql/catalog/catpb",
"//pkg/sql/catalog/colinfo",
"//pkg/sql/catalog/descpb",
"//pkg/sql/parser",
Expand Down
15 changes: 5 additions & 10 deletions pkg/sql/catalog/schemaexpr/expr.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"fmt"

"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/parser"
Expand Down Expand Up @@ -459,15 +460,12 @@ func SanitizeVarFreeExpr(
// ValidateTTLExpressionDoesNotDependOnColumn verifies that the
// ttl_expiration_expression, if any, does not reference the given column.
func ValidateTTLExpressionDoesNotDependOnColumn(
tableDesc catalog.TableDescriptor, col catalog.Column,
tableDesc catalog.TableDescriptor, rowLevelTTL *catpb.RowLevelTTL, col catalog.Column,
) error {
if !tableDesc.HasRowLevelTTL() {
return nil
}
expirationExpr := tableDesc.GetRowLevelTTL().ExpirationExpr
if expirationExpr == "" {
if rowLevelTTL == nil || !rowLevelTTL.HasExpirationExpr() {
return nil
}
expirationExpr := rowLevelTTL.ExpirationExpr
expr, err := parser.ParseExpr(string(expirationExpr))
if err != nil {
// At this point, we should be able to parse the expiration expression.
Expand Down Expand Up @@ -497,12 +495,9 @@ func ValidateTTLExpirationExpression(
tableDesc catalog.TableDescriptor,
semaCtx *tree.SemaContext,
tableName *tree.TableName,
ttl *catpb.RowLevelTTL,
) error {
if !tableDesc.HasRowLevelTTL() {
return nil
}

ttl := tableDesc.GetRowLevelTTL()
if !ttl.HasExpirationExpr() {
return nil
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/catalog/tabledesc/structured.go
Original file line number Diff line number Diff line change
Expand Up @@ -2605,7 +2605,8 @@ func (desc *wrapper) GetStorageParams(spaceBetweenEqual bool) []string {
appendStorageParam := func(key, value string) {
storageParams = append(storageParams, key+spacing+`=`+spacing+value)
}
if ttl := desc.GetRowLevelTTL(); ttl != nil {
if desc.HasRowLevelTTL() {
ttl := desc.GetRowLevelTTL()
appendStorageParam(`ttl`, `'on'`)
if ttl.HasDurationExpr() {
appendStorageParam(`ttl_expire_after`, string(ttl.DurationExpr))
Expand Down
62 changes: 35 additions & 27 deletions pkg/sql/create_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -1324,14 +1324,18 @@ func NewTableDesc(
id, dbID, sc.GetID(), n.Table.Table(), creationTime, privileges, persistence,
)

setter := tablestorageparam.NewSetter(&desc)
if err := storageparam.Set(
semaCtx,
evalCtx,
n.StorageParams,
tablestorageparam.NewSetter(&desc),
setter,
); err != nil {
return nil, err
}
if updatedRowLevelTTL := setter.UpdatedRowLevelTTL; updatedRowLevelTTL != nil {
setter.TableDesc.RowLevelTTL = updatedRowLevelTTL
}

indexEncodingVersion := descpb.StrictIndexColumnIDGuaranteesVersion
isRegionalByRow := n.Locality != nil && n.Locality.LocalityLevel == tree.LocalityLevelRow
Expand Down Expand Up @@ -1466,34 +1470,37 @@ func NewTableDesc(
}

// Create the TTL automatic column (crdb_internal_expiration) if one does not already exist.
if ttl := desc.GetRowLevelTTL(); ttl != nil && ttl.HasDurationExpr() {
hasRowLevelTTLColumn := false
for _, def := range n.Defs {
switch def := def.(type) {
case *tree.ColumnTableDef:
if def.Name == colinfo.TTLDefaultExpirationColumnName {
// If we find the column, make sure it has the expected type.
if def.Type.SQLString() != types.TimestampTZ.SQLString() {
return nil, pgerror.Newf(
pgcode.InvalidTableDefinition,
`table %s has TTL defined, but column %s is not a %s`,
def.Name,
colinfo.TTLDefaultExpirationColumnName,
types.TimestampTZ.SQLString(),
)
if desc.HasRowLevelTTL() {
ttl := desc.GetRowLevelTTL()
if ttl.HasDurationExpr() {
hasRowLevelTTLColumn := false
for _, def := range n.Defs {
switch def := def.(type) {
case *tree.ColumnTableDef:
if def.Name == colinfo.TTLDefaultExpirationColumnName {
// If we find the column, make sure it has the expected type.
if def.Type.SQLString() != types.TimestampTZ.SQLString() {
return nil, pgerror.Newf(
pgcode.InvalidTableDefinition,
`table %s has TTL defined, but column %s is not a %s`,
def.Name,
colinfo.TTLDefaultExpirationColumnName,
types.TimestampTZ.SQLString(),
)
}
hasRowLevelTTLColumn = true
break
}
hasRowLevelTTLColumn = true
break
}
}
}
if !hasRowLevelTTLColumn {
col, err := rowLevelTTLAutomaticColumnDef(ttl)
if err != nil {
return nil, err
if !hasRowLevelTTLColumn {
col, err := rowLevelTTLAutomaticColumnDef(ttl)
if err != nil {
return nil, err
}
n.Defs = append(n.Defs, col)
cdd = append(cdd, nil)
}
n.Defs = append(n.Defs, col)
cdd = append(cdd, nil)
}
}

Expand Down Expand Up @@ -2347,8 +2354,9 @@ func newTableDesc(
}

// Row level TTL tables require a scheduled job to be created as well.
if ttl := ret.RowLevelTTL; ttl != nil {
if err := schemaexpr.ValidateTTLExpirationExpression(params.ctx, ret, params.p.SemaCtx(), &n.Table); err != nil {
if ret.HasRowLevelTTL() {
ttl := ret.GetRowLevelTTL()
if err := schemaexpr.ValidateTTLExpirationExpression(params.ctx, ret, params.p.SemaCtx(), &n.Table, ttl); err != nil {
return nil, err
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/sql/schema_changer.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ func (sc *SchemaChanger) maybeUpdateScheduledJobsForRowLevelTTL(
ctx context.Context, tableDesc catalog.TableDescriptor,
) error {
// Drop the scheduled job if one exists and the table descriptor is being dropped.
if tableDesc.Dropped() && tableDesc.GetRowLevelTTL() != nil {
if tableDesc.Dropped() && tableDesc.HasRowLevelTTL() {
if err := sc.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
scheduleID := tableDesc.GetRowLevelTTL().ScheduleID
if scheduleID > 0 {
Expand Down Expand Up @@ -1554,8 +1554,8 @@ func (sc *SchemaChanger) done(ctx context.Context) error {
scTable.RowLevelTTL.ScheduleID = j.ScheduleID()
}
} else if m.Dropped() {
if ttl := scTable.RowLevelTTL; ttl != nil {
if err := DeleteSchedule(ctx, sc.execCfg, txn, ttl.ScheduleID); err != nil {
if scTable.HasRowLevelTTL() {
if err := DeleteSchedule(ctx, sc.execCfg, txn, scTable.GetRowLevelTTL().ScheduleID); err != nil {
return err
}
}
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/storageparam/tablestorageparam/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ go_library(
"//pkg/sql/storageparam",
"//pkg/util/duration",
"//pkg/util/errorutil/unimplemented",
"//pkg/util/protoutil",
"@com_github_cockroachdb_errors//:errors",
],
)
Expand Down
Loading

0 comments on commit 97fbd44

Please sign in to comment.