Skip to content

Commit

Permalink
sql: validate check constraints with the schema changer
Browse files Browse the repository at this point in the history
Currently, constraints are added in the `Unvalidated` state, and are not
validated for existing rows until ALTER TABLE ... VALIDATE CONSTRAINT is run.
With this change, check constraints will be validated asynchronously after they
are added by default (and similar changes to FKs are to follow). This addresses
the problematic long-running transactions caused by the current implementation
of VALIDATE CONSTRAINT. This PR is a rework of cockroachdb#32504, and has the same tests.

With this change, check constraints will be added to the table descriptor in
the new `Validating` state, visible to CRUD operations, and a mutation is
queued indicating that the constraint is to be validated. During the backfill
step, the constraint is validated for existing rows. If validation succeeds,
then the constraint moves to the `Validated` state; otherwise, it is dropped.

The behavior when dropping constraints (either via DROP CONSTRAINT or
indirectly when a column is dropped) is unchanged: no mutation is enqueued.

As part of this change, check constraints can be added to non-public columns in
the process of being added, including columns that were created earlier in the
same transaction.

The main difference between this PR and cockroachdb#32504 is that cockroachdb#32504 does not add the
constraint to the table descriptor until it has been validated.

Release note (sql change): Check constraint adds by default will validate table
data with the added constraint asynchronously after the transaction commits.
  • Loading branch information
lucy-zhang committed Feb 12, 2019
1 parent ee130c0 commit cea639a
Show file tree
Hide file tree
Showing 16 changed files with 1,174 additions and 304 deletions.
26 changes: 23 additions & 3 deletions pkg/sql/alter_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,15 +213,23 @@ func (n *alterTableNode) startExec(params runParams) error {
}

case *tree.CheckConstraintTableDef:
// A previous command could have added a column which the new constraint uses,
// allocate IDs now.
if err != n.tableDesc.AllocateIDs() {
return err
}

ck, err := MakeCheckConstraint(params.ctx,
n.tableDesc, d, inuseNames, &params.p.semaCtx, n.n.Table)
if err != nil {
return err
}
ck.Validity = sqlbase.ConstraintValidity_Unvalidated
ck.Validity = sqlbase.ConstraintValidity_Validating
n.tableDesc.Checks = append(n.tableDesc.Checks, ck)
descriptorChanged = true

n.tableDesc.AddCheckValidationMutation(ck.Name)

case *tree.ForeignKeyConstraintTableDef:
for _, colName := range d.FromCols {
col, _, err := n.tableDesc.FindColumnByName(colName)
Expand Down Expand Up @@ -402,7 +410,11 @@ func (n *alterTableNode) startExec(params runParams) error {
for _, check := range n.tableDesc.Checks {
if used, err := check.UsesColumn(n.tableDesc.TableDesc(), col.ID); err != nil {
return err
} else if !used {
} else if used {
if check.Validity == sqlbase.ConstraintValidity_Validating {
return fmt.Errorf("referencing constraint %q in the middle of being added, try again later", check.Name)
}
} else {
validChecks = append(validChecks, check)
}
}
Expand Down Expand Up @@ -452,6 +464,9 @@ func (n *alterTableNode) startExec(params runParams) error {
return fmt.Errorf("UNIQUE constraint depends on index %q, use DROP INDEX with CASCADE if you really want to drop it", t.Constraint)
case sqlbase.ConstraintTypeCheck:
for i := range n.tableDesc.Checks {
if n.tableDesc.Checks[i].Validity == sqlbase.ConstraintValidity_Validating {
return fmt.Errorf("constraint %q in the middle of being added, try again later", t.Constraint)
}
if n.tableDesc.Checks[i].Name == name {
n.tableDesc.Checks = append(n.tableDesc.Checks[:i], n.tableDesc.Checks[i+1:]...)
descriptorChanged = true
Expand Down Expand Up @@ -498,9 +513,14 @@ func (n *alterTableNode) startExec(params runParams) error {
if !found {
panic("constraint returned by GetConstraintInfo not found")
}

if n.tableDesc.Checks[idx].Validity == sqlbase.ConstraintValidity_Validating {
return fmt.Errorf("constraint %q in the middle of being added, try again later", t.Constraint)
}

ck := n.tableDesc.Checks[idx]
if err := validateCheckExpr(
params.ctx, ck.Expr, &n.n.Table, n.tableDesc.TableDesc(), params.EvalContext(),
params.ctx, ck.Expr, n.tableDesc.TableDesc(), params.EvalContext().InternalExecutor, params.EvalContext().Txn,
); err != nil {
return err
}
Expand Down
168 changes: 167 additions & 1 deletion pkg/sql/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ func (sc *SchemaChanger) runBackfill(
var droppedIndexDescs []sqlbase.IndexDescriptor
var addedIndexDescs []sqlbase.IndexDescriptor

var checksToValidate []sqlbase.ConstraintToValidate

var tableDesc *sqlbase.TableDescriptor
if err := sc.db.Txn(ctx, func(ctx context.Context, txn *client.Txn) error {
var err error
Expand Down Expand Up @@ -156,6 +158,13 @@ func (sc *SchemaChanger) runBackfill(
}
case *sqlbase.DescriptorMutation_Index:
addedIndexDescs = append(addedIndexDescs, *t.Index)
case *sqlbase.DescriptorMutation_Constraint:
switch t.Constraint.ConstraintType {
case sqlbase.ConstraintToValidate_CHECK:
checksToValidate = append(checksToValidate, *t.Constraint)
default:
return errors.Errorf("unsupported constraint type: %d", t.Constraint.ConstraintType)
}
default:
return errors.Errorf("unsupported mutation: %+v", m)
}
Expand All @@ -168,6 +177,8 @@ func (sc *SchemaChanger) runBackfill(
if !sc.canClearRangeForDrop(t.Index) {
droppedIndexDescs = append(droppedIndexDescs, *t.Index)
}
case *sqlbase.DescriptorMutation_Constraint:
// no-op
default:
return errors.Errorf("unsupported mutation: %+v", m)
}
Expand Down Expand Up @@ -198,9 +209,113 @@ func (sc *SchemaChanger) runBackfill(
}
}

// Validate check constraints.
if len(checksToValidate) > 0 {
if err := sc.validateChecks(ctx, evalCtx, lease, checksToValidate); err != nil {
return err
}
}
return nil
}

func (sc *SchemaChanger) validateChecks(
ctx context.Context,
evalCtx *extendedEvalContext,
lease *sqlbase.TableDescriptor_SchemaChangeLease,
checks []sqlbase.ConstraintToValidate,
) error {
if testDisableTableLeases {
return nil
}
readAsOf := sc.clock.Now()
return sc.db.Txn(ctx, func(ctx context.Context, txn *client.Txn) error {
txn.SetFixedTimestamp(ctx, readAsOf)
tableDesc, err := sqlbase.GetTableDescFromID(ctx, txn, sc.tableID)
if err != nil {
return err
}

if err := sc.ExtendLease(ctx, lease); err != nil {
return err
}

grp := ctxgroup.WithContext(ctx)

// Notify when validation is finished (or has returned an error) for a check.
countDone := make(chan struct{}, len(checks))

for _, c := range checks {
grp.GoCtx(func(ctx context.Context) error {
defer func() { countDone <- struct{}{} }()

// Make the mutations public in a private copy of the descriptor
// and add it to the TableCollection, so that we can use SQL below to perform
// the validation. We wouldn't have needed to do this if we could have
// updated the descriptor and run validation in the same transaction. However,
// our current system is incapable of running long running schema changes
// (the validation can take many minutes). So we pretend that the schema
// has been updated and actually update it in a separate transaction that
// follows this one.
desc, err := sqlbase.NewImmutableTableDescriptor(*tableDesc).MakeFirstMutationPublic()
if err != nil {
return err
}
tc := &TableCollection{leaseMgr: sc.leaseMgr}
// pretend that the schema has been modified.
if err := tc.addUncommittedTable(*desc); err != nil {
return err
}

// Create a new eval context only because the eval context cannot be shared across many
// goroutines.
newEvalCtx := createSchemaChangeEvalCtx(ctx, readAsOf, evalCtx.Tracing, sc.ieFactory)
// TODO(vivek): This is not a great API. Leaving #34304 open.
ie := newEvalCtx.InternalExecutor.(*SessionBoundInternalExecutor)
ie.impl.tcModifier = tc
defer func() {
ie.impl.tcModifier = nil
}()

check, err := desc.FindCheckByName(c.Name)
if err != nil {
return err
}

return validateCheckExpr(ctx, check.Expr, desc.TableDesc(), ie, txn)
})
}

// Periodic schema change lease extension.
grp.GoCtx(func(ctx context.Context) error {
count := len(checks)
refreshTimer := timeutil.NewTimer()
defer refreshTimer.Stop()
refreshTimer.Reset(checkpointInterval)
for {
select {
case <-countDone:
count--
if count == 0 {
// Stop.
return nil
}

case <-refreshTimer.C:
refreshTimer.Read = true
refreshTimer.Reset(checkpointInterval)
if err := sc.ExtendLease(ctx, lease); err != nil {
return err
}

case <-ctx.Done():
return ctx.Err()
}
}
})
return grp.Wait()
})
}

func (sc *SchemaChanger) getTableVersion(
ctx context.Context, txn *client.Txn, tc *TableCollection, version sqlbase.DescriptorVersion,
) (*sqlbase.ImmutableTableDescriptor, error) {
Expand Down Expand Up @@ -701,11 +816,14 @@ func runSchemaChangesInTxn(
// Only needed because columnBackfillInTxn() backfills
// all column mutations.
doneColumnBackfill := false
// Checks are validated after all other mutations have been applied.
var checksToValidate []sqlbase.ConstraintToValidate

for _, m := range tableDesc.Mutations {
immutDesc := sqlbase.NewImmutableTableDescriptor(*tableDesc.TableDesc())
switch m.Direction {
case sqlbase.DescriptorMutation_ADD:
switch m.Descriptor_.(type) {
switch t := m.Descriptor_.(type) {
case *sqlbase.DescriptorMutation_Column:
if doneColumnBackfill || !sqlbase.ColumnNeedsBackfill(m.GetColumn()) {
break
Expand All @@ -720,6 +838,14 @@ func runSchemaChangesInTxn(
return err
}

case *sqlbase.DescriptorMutation_Constraint:
switch t.Constraint.ConstraintType {
case sqlbase.ConstraintToValidate_CHECK:
checksToValidate = append(checksToValidate, *t.Constraint)
default:
return errors.Errorf("unsupported constraint type: %d", t.Constraint.ConstraintType)
}

default:
return errors.Errorf("unsupported mutation: %+v", m)
}
Expand All @@ -741,6 +867,9 @@ func runSchemaChangesInTxn(
return err
}

case *sqlbase.DescriptorMutation_Constraint:
return errors.Errorf("constraint validation mutation cannot be in the DROP state within the same transaction: %+v", m)

default:
return errors.Errorf("unsupported mutation: %+v", m)
}
Expand All @@ -752,6 +881,43 @@ func runSchemaChangesInTxn(
}
tableDesc.Mutations = nil

// Now that the table descriptor is in a valid state with all column and index
// mutations applied, it can be used for validating check constraints
return validateChecksInTxn(ctx, tc.leaseMgr, evalCtx, tableDesc, txn, checksToValidate)
}

// validateCheckInTxn validates check constraints within the provided
// transaction. The table descriptor that is passed in will be used for the
// InternalExecutor that performs the validation query.
func validateChecksInTxn(
ctx context.Context,
leaseMgr *LeaseManager,
evalCtx *tree.EvalContext,
tableDesc *MutableTableDescriptor,
txn *client.Txn,
checks []sqlbase.ConstraintToValidate,
) error {
newTc := &TableCollection{leaseMgr: leaseMgr}
// pretend that the schema has been modified.
if err := newTc.addUncommittedTable(*tableDesc); err != nil {
return err
}

ie := evalCtx.InternalExecutor.(*SessionBoundInternalExecutor)
ie.impl.tcModifier = newTc
defer func() {
ie.impl.tcModifier = nil
}()

for _, c := range checks {
check, err := tableDesc.FindCheckByName(c.Name)
if err != nil {
return err
}
if err := validateCheckExpr(ctx, check.Expr, tableDesc.TableDesc(), ie, txn); err != nil {
return err
}
}
return nil
}

Expand Down
11 changes: 7 additions & 4 deletions pkg/sql/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"strings"

"github.com/cockroachdb/cockroach/pkg/internal/client"
"github.com/cockroachdb/cockroach/pkg/sql/parser"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
Expand All @@ -31,26 +32,28 @@ import (
func validateCheckExpr(
ctx context.Context,
exprStr string,
tableName tree.TableExpr,
tableDesc *sqlbase.TableDescriptor,
evalCtx *tree.EvalContext,
ie tree.SessionBoundInternalExecutor,
txn *client.Txn,
) error {
expr, err := parser.ParseExpr(exprStr)
if err != nil {
return err
}
// Construct AST and then convert to a string, to avoid problems with escaping the check expression
tblref := tree.TableRef{TableID: int64(tableDesc.ID), As: tree.AliasClause{Alias: "t"}}
sel := &tree.SelectClause{
Exprs: sqlbase.ColumnsSelectors(tableDesc.Columns, false /* forUpdateOrDelete */),
From: &tree.From{Tables: tree.TableExprs{tableName}},
From: &tree.From{Tables: []tree.TableExpr{&tblref}},
Where: &tree.Where{Type: tree.AstWhere, Expr: &tree.NotExpr{Expr: expr}},
}
lim := &tree.Limit{Count: tree.NewDInt(1)}
stmt := &tree.Select{Select: sel, Limit: lim}

queryStr := tree.AsStringWithFlags(stmt, tree.FmtParsable)
log.Infof(ctx, "Validating check constraint %q with query %q", expr.String(), queryStr)

rows, err := evalCtx.InternalExecutor.QueryRow(ctx, "validate check constraint", evalCtx.Txn, queryStr)
rows, err := ie.QueryRow(ctx, "validate check constraint", txn, queryStr)
if err != nil {
return err
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/crdb_internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,9 @@ CREATE TABLE crdb_internal.schema_changes (
mutType = "INDEX"
targetID = tree.NewDInt(tree.DInt(int64(d.Index.ID)))
targetName = tree.NewDString(d.Index.Name)
case *sqlbase.DescriptorMutation_Constraint:
mutType = "CONSTRAINT VALIDATION"
targetName = tree.NewDString(d.Constraint.Name)
}
if err := addRow(
tableID,
Expand Down
6 changes: 3 additions & 3 deletions pkg/sql/create_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -1484,8 +1484,8 @@ func replaceVars(
return nil, true, expr
}

col, err := desc.FindActiveColumnByName(string(c.ColumnName))
if err != nil {
col, dropped, err := desc.FindColumnByName(c.ColumnName)
if err != nil || dropped {
return fmt.Errorf("column %q not found for constraint %q",
c.ColumnName, expr.String()), false, nil
}
Expand Down Expand Up @@ -1533,7 +1533,7 @@ func MakeCheckConstraint(
sort.Sort(sqlbase.ColumnIDs(colIDs))

sourceInfo := sqlbase.NewSourceInfoForSingleTable(
tableName, sqlbase.ResultColumnsFromColDescs(desc.Columns),
tableName, sqlbase.ResultColumnsFromColDescs(desc.TableDesc().AllNonDropColumns()),
)
sources := sqlbase.MultiSourceInfo{sourceInfo}

Expand Down
Loading

0 comments on commit cea639a

Please sign in to comment.