Skip to content

Commit

Permalink
sql: async validation of foreign keys in ADD CONSTRAINT
Browse files Browse the repository at this point in the history
Currently, validating an unvalidated foreign key constraint requires running
`VALIDATE CONSTRAINT`, which executes a potentially long-running query in the
user transaction. In this PR, `ADD CONSTRAINT` for foreign keys is now
processed by the schema changer, with the validation for existing rows running
in a separate transaction from the original user transaction.

The foreign key mutation (represented by the new `FOREIGN_KEY` enum value in
`ConstraintToUpdate`) is processed in the same way as check constraint
mutations: the foreign key is in the mutations list while other columns and
indexes with the same mutation ID are backfilled, then added to the appropriate
index in the `Validating` state before being validated, and is finalized when
the validation query for existing rows completes successfully. If validation
fails, the transaction is rolled back, with the foreign key (and backreference)
removed from the table descriptor(s) as part of the rollback.

Adding foreign keys to columns or indexes being added is still not supported
and is left for later work. Also unsupported is adding a foreign key constraint
in the same transaction as `CREATE TABLE` that is either validated or that
rolls back the entire transaction on failure. In this PR, the constraint is
just left unvalidated; This needs a follow-up PR to queue a separate mutation
for validating the constraint after it's been added.

Release note (sql change): Foreign keys that are added to an existing table
using `ALTER TABLE` will now be validated for existing rows, with improved
performance compared to running `ADD CONSTRAINT` followed by `VALIDATE
CONSTRAINT` previously.
  • Loading branch information
lucy-zhang committed May 9, 2019
1 parent 89d3f07 commit 55cc506
Show file tree
Hide file tree
Showing 16 changed files with 825 additions and 861 deletions.
35 changes: 16 additions & 19 deletions pkg/sql/alter_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -491,56 +491,53 @@ func (n *alterTableNode) startExec(params runParams) error {
switch constraint.Kind {
case sqlbase.ConstraintTypeCheck:
found := false
var idx int
for idx = range n.tableDesc.Checks {
if n.tableDesc.Checks[idx].Name == name {
var ck *sqlbase.TableDescriptor_CheckConstraint
for _, c := range n.tableDesc.Checks {
// If the constraint is still being validated, don't allow VALIDATE CONSTRAINT to run
if c.Name == name && c.Validity != sqlbase.ConstraintValidity_Validating {
found = true
ck = c
break
}
}
if !found {
return pgerror.Newf(pgerror.CodeObjectNotInPrerequisiteStateError,
"constraint %q in the middle of being added, try again later", t.Constraint)
}

ck := n.tableDesc.Checks[idx]
if err := validateCheckExpr(
params.ctx, ck.Expr, n.tableDesc.TableDesc(), params.EvalContext().InternalExecutor, params.EvalContext().Txn,
); err != nil {
return err
}
n.tableDesc.Checks[idx].Validity = sqlbase.ConstraintValidity_Validated
descriptorChanged = true
ck.Validity = sqlbase.ConstraintValidity_Validated

case sqlbase.ConstraintTypeFK:
found := false
var id sqlbase.IndexID
var fkIdx *sqlbase.IndexDescriptor
for _, idx := range n.tableDesc.AllNonDropIndexes() {
if idx.ForeignKey.IsSet() && idx.ForeignKey.Name == name {
fk := &idx.ForeignKey
// If the constraint is still being validated, don't allow VALIDATE CONSTRAINT to run
if fk.IsSet() && fk.Name == name && fk.Validity != sqlbase.ConstraintValidity_Validating {
found = true
id = idx.ID
fkIdx = idx
break
}
}
if !found {
return pgerror.AssertionFailedf(
"constraint returned by GetConstraintInfo not found")
}
idx, err := n.tableDesc.FindIndexByID(id)
if err != nil {
return pgerror.NewAssertionErrorWithWrappedErrf(err, "")
return pgerror.Newf(pgerror.CodeObjectNotInPrerequisiteStateError,
"constraint %q in the middle of being added, try again later", t.Constraint)
}
if err := params.p.validateForeignKey(params.ctx, n.tableDesc.TableDesc(), idx); err != nil {
if err := validateForeignKey(params.ctx, n.tableDesc.TableDesc(), fkIdx, params.EvalContext().InternalExecutor, params.EvalContext().Txn); err != nil {
return err
}
idx.ForeignKey.Validity = sqlbase.ConstraintValidity_Validated
descriptorChanged = true
fkIdx.ForeignKey.Validity = sqlbase.ConstraintValidity_Validated

default:
return pgerror.Newf(pgerror.CodeWrongObjectTypeError,
"constraint %q of relation %q is not a foreign key or check constraint",
tree.ErrString(&t.Constraint), tree.ErrString(&n.n.Table))
}
descriptorChanged = true

case tree.ColumnMutationCmd:
// Column mutations
Expand Down
191 changes: 144 additions & 47 deletions pkg/sql/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/pkg/errors"
)

const (
Expand Down Expand Up @@ -125,8 +126,8 @@ func (sc *SchemaChanger) runBackfill(
var droppedIndexDescs []sqlbase.IndexDescriptor
var addedIndexSpans []roachpb.Span

var addedChecks []*sqlbase.TableDescriptor_CheckConstraint
var checksToValidate []sqlbase.ConstraintToUpdate
var constraintsToAdd []sqlbase.ConstraintToUpdate
var constraintsToValidate []sqlbase.ConstraintToUpdate

tableDesc, err := sc.updateJobRunningStatus(ctx, RunningStatusBackfill)
if err != nil {
Expand Down Expand Up @@ -157,14 +158,8 @@ func (sc *SchemaChanger) runBackfill(
case *sqlbase.DescriptorMutation_Index:
addedIndexSpans = append(addedIndexSpans, tableDesc.IndexSpan(t.Index.ID))
case *sqlbase.DescriptorMutation_Constraint:
switch t.Constraint.ConstraintType {
case sqlbase.ConstraintToUpdate_CHECK:
addedChecks = append(addedChecks, &t.Constraint.Check)
checksToValidate = append(checksToValidate, *t.Constraint)
default:
return pgerror.AssertionFailedf(
"unsupported constraint type: %d", log.Safe(t.Constraint.ConstraintType))
}
constraintsToAdd = append(constraintsToAdd, *t.Constraint)
constraintsToValidate = append(constraintsToValidate, *t.Constraint)
default:
return pgerror.AssertionFailedf(
"unsupported mutation: %+v", m)
Expand Down Expand Up @@ -216,7 +211,7 @@ func (sc *SchemaChanger) runBackfill(
}
}

// Add check constraints, publish the new version of the table descriptor,
// Add check and foreign key constraints, publish the new version of the table descriptor,
// and wait until the entire cluster is on the new version. This is basically
// a state transition for the schema change, which must happen after the
// columns are backfilled and before constraint validation begins. This
Expand All @@ -225,44 +220,63 @@ func (sc *SchemaChanger) runBackfill(
// a constraint references both public and non-public columns), and 2) the
// validation occurs only when the entire cluster is already enforcing the
// constraint on insert/update.
if len(addedChecks) > 0 {
if err := sc.addChecks(ctx, addedChecks); err != nil {
if len(constraintsToAdd) > 0 {
if err := sc.AddConstraints(ctx, constraintsToAdd); err != nil {
return err
}
}

// Validate check constraints.
if len(checksToValidate) > 0 {
if err := sc.validateChecks(ctx, evalCtx, lease, checksToValidate); err != nil {
// Validate check and foreign key constraints.
if len(constraintsToValidate) > 0 {
if err := sc.validateConstraints(ctx, evalCtx, lease, constraintsToValidate); err != nil {
return err
}
}
return nil
}

// addChecks publishes a new version of the given table descriptor with the
// AddConstraints publishes a new version of the given table descriptor with the
// given check constraint added to it, and waits until the entire cluster is on
// the new version of the table descriptor.
func (sc *SchemaChanger) addChecks(
ctx context.Context, addedChecks []*sqlbase.TableDescriptor_CheckConstraint,
func (sc *SchemaChanger) AddConstraints(
ctx context.Context, constraints []sqlbase.ConstraintToUpdate,
) error {
_, err := sc.leaseMgr.Publish(ctx, sc.tableID,
func(desc *sqlbase.MutableTableDescriptor) error {
for i, added := range addedChecks {
found := false
for _, c := range desc.Checks {
if c.Name == added.Name {
log.VEventf(
ctx, 2,
"backfiller tried to add constraint %+v but found existing constraint %+v, presumably due to a retry",
added, c,
)
found = true
break
for i, added := range constraints {
switch added.ConstraintType {
case sqlbase.ConstraintToUpdate_CHECK:
found := false
for _, c := range desc.Checks {
if c.Name == added.Name {
log.VEventf(
ctx, 2,
"backfiller tried to add constraint %+v but found existing constraint %+v, presumably due to a retry",
added, c,
)
found = true
break
}
}
if !found {
desc.Checks = append(desc.Checks, &constraints[i].Check)
}
case sqlbase.ConstraintToUpdate_FOREIGN_KEY:
idx, err := desc.FindIndexByID(added.ForeignKeyIndex)
if err != nil {
return err
}
if idx.ForeignKey.IsSet() {
if log.V(2) {
log.VEventf(
ctx, 2,
"backfiller tried to add constraint %+v but found existing constraint %+v, presumably due to a retry",
added, idx.ForeignKey,
)
}
} else {
idx.ForeignKey = added.ForeignKey
}
}
if !found {
desc.Checks = append(desc.Checks, addedChecks[i])
}
}
return nil
Expand All @@ -275,11 +289,11 @@ func (sc *SchemaChanger) addChecks(
return sc.waitToUpdateLeases(ctx, sc.tableID)
}

func (sc *SchemaChanger) validateChecks(
func (sc *SchemaChanger) validateConstraints(
ctx context.Context,
evalCtx *extendedEvalContext,
lease *sqlbase.TableDescriptor_SchemaChangeLease,
checks []sqlbase.ConstraintToUpdate,
constraints []sqlbase.ConstraintToUpdate,
) error {
if testDisableTableLeases {
return nil
Expand Down Expand Up @@ -311,10 +325,10 @@ func (sc *SchemaChanger) validateChecks(
grp := ctxgroup.WithContext(ctx)

// Notify when validation is finished (or has returned an error) for a check.
countDone := make(chan struct{}, len(checks))
countDone := make(chan struct{}, len(constraints))

for i := range checks {
c := checks[i]
for i := range constraints {
c := constraints[i]
grp.GoCtx(func(ctx context.Context) error {
defer func() { countDone <- struct{}{} }()

Expand All @@ -333,13 +347,25 @@ func (sc *SchemaChanger) validateChecks(
// Create a new eval context only because the eval context cannot be shared across many
// goroutines.
newEvalCtx := createSchemaChangeEvalCtx(ctx, readAsOf, evalCtx.Tracing, sc.ieFactory)
return validateCheckInTxn(ctx, sc.leaseMgr, &newEvalCtx.EvalContext, desc, txn, &c.Name)
switch c.ConstraintType {
case sqlbase.ConstraintToUpdate_CHECK:
if err := validateCheckInTxn(ctx, sc.leaseMgr, &newEvalCtx.EvalContext, desc, txn, c.Name); err != nil {
return err
}
case sqlbase.ConstraintToUpdate_FOREIGN_KEY:
if err := validateFkInTxn(ctx, sc.leaseMgr, &newEvalCtx.EvalContext, desc, txn, c.Name); err != nil {
return err
}
default:
return errors.Errorf("unsupported constraint type: %d", c.ConstraintType)
}
return nil
})
}

// Periodic schema change lease extension.
grp.GoCtx(func(ctx context.Context) error {
count := len(checks)
count := len(constraints)
refreshTimer := timeutil.NewTimer()
defer refreshTimer.Stop()
refreshTimer.Reset(checkpointInterval)
Expand Down Expand Up @@ -1114,7 +1140,7 @@ func runSchemaChangesInTxn(
// all column mutations.
doneColumnBackfill := false
// Checks are validated after all other mutations have been applied.
var checksToValidate []sqlbase.ConstraintToUpdate
var constraintsToValidate []sqlbase.ConstraintToUpdate

for _, m := range tableDesc.Mutations {
immutDesc := sqlbase.NewImmutableTableDescriptor(*tableDesc.TableDesc())
Expand All @@ -1139,11 +1165,17 @@ func runSchemaChangesInTxn(
switch t.Constraint.ConstraintType {
case sqlbase.ConstraintToUpdate_CHECK:
tableDesc.Checks = append(tableDesc.Checks, &t.Constraint.Check)
checksToValidate = append(checksToValidate, *t.Constraint)
case sqlbase.ConstraintToUpdate_FOREIGN_KEY:
idx, err := tableDesc.FindIndexByID(t.Constraint.ForeignKeyIndex)
if err != nil {
return err
}
idx.ForeignKey = t.Constraint.ForeignKey
default:
return pgerror.AssertionFailedf(
"unsupported constraint type: %d", log.Safe(t.Constraint.ConstraintType))
}
constraintsToValidate = append(constraintsToValidate, *t.Constraint)

default:
return pgerror.AssertionFailedf(
Expand Down Expand Up @@ -1184,9 +1216,33 @@ func runSchemaChangesInTxn(

// Now that the table descriptor is in a valid state with all column and index
// mutations applied, it can be used for validating check constraints
for _, c := range checksToValidate {
if err := validateCheckInTxn(ctx, tc.leaseMgr, evalCtx, tableDesc, txn, &c.Name); err != nil {
return err
for _, c := range constraintsToValidate {
switch c.ConstraintType {
case sqlbase.ConstraintToUpdate_CHECK:
if err := validateCheckInTxn(ctx, tc.leaseMgr, evalCtx, tableDesc, txn, c.Name); err != nil {
return err
}
case sqlbase.ConstraintToUpdate_FOREIGN_KEY:
// We can't support adding a validated foreign key constraint in the same
// transaction as the CREATE TABLE statement. This would require adding
// the backreference to the other table and then validating the constraint
// for whatever rows were inserted into the referencing table in this
// transaction, which requires multiple schema changer states across
// multiple transactions.
// TODO (lucy): Add a validation job that runs after the user transaction.
// This won't roll back the original transaction if validation fails, but
// it will at least leave the constraint in the Validated state if
// validation succeeds.

// For now, revert the constraint to an unvalidated state.
idx, err := tableDesc.FindIndexByID(c.ForeignKeyIndex)
if err != nil {
return err
}
idx.ForeignKey.Validity = sqlbase.ConstraintValidity_Unvalidated
default:
return pgerror.AssertionFailedf(
"unsupported constraint type: %d", log.Safe(c.ConstraintType))
}
}
return nil
Expand All @@ -1201,7 +1257,7 @@ func validateCheckInTxn(
evalCtx *tree.EvalContext,
tableDesc *MutableTableDescriptor,
txn *client.Txn,
checkName *string,
checkName string,
) error {
newTc := &TableCollection{leaseMgr: leaseMgr}
// pretend that the schema has been modified.
Expand All @@ -1215,13 +1271,54 @@ func validateCheckInTxn(
ie.impl.tcModifier = nil
}()

check, err := tableDesc.FindCheckByName(*checkName)
check, err := tableDesc.FindCheckByName(checkName)
if err != nil {
return err
}
return validateCheckExpr(ctx, check.Expr, tableDesc.TableDesc(), ie, txn)
}

func validateFkInTxn(
ctx context.Context,
leaseMgr *LeaseManager,
evalCtx *tree.EvalContext,
tableDesc *MutableTableDescriptor,
txn *client.Txn,
fkName string,
) error {
newTc := &TableCollection{leaseMgr: leaseMgr}
// pretend that the schema has been modified.
if err := newTc.addUncommittedTable(*tableDesc); err != nil {
return err
}

ie := evalCtx.InternalExecutor.(*SessionBoundInternalExecutor)
ie.impl.tcModifier = newTc
defer func() {
ie.impl.tcModifier = nil
}()

var fkIdx *sqlbase.IndexDescriptor
if tableDesc.PrimaryIndex.ForeignKey.IsSet() && tableDesc.PrimaryIndex.ForeignKey.Name == fkName {
fkIdx = &tableDesc.PrimaryIndex
} else {
found := false
for i := range tableDesc.Indexes {
idx := &tableDesc.Indexes[i]
if idx.ForeignKey.IsSet() && idx.ForeignKey.Name == fkName {
fkIdx = idx
found = true
break
}
}
if !found {
return fmt.Errorf("foreign key %s does not exist", fkName)
}
}

return validateForeignKey(ctx, tableDesc.TableDesc(), fkIdx, ie, txn)
}

// columnBackfillInTxn backfills columns for all mutation columns in
// the mutation list.
func columnBackfillInTxn(
Expand Down
Loading

0 comments on commit 55cc506

Please sign in to comment.