Skip to content

Commit

Permalink
sql: add support for NOT VALID Foreign Key constraints
Browse files Browse the repository at this point in the history
  • Loading branch information
Tyler314 committed Jul 3, 2019
1 parent c552725 commit eb16f9f
Show file tree
Hide file tree
Showing 7 changed files with 130 additions and 20 deletions.
2 changes: 1 addition & 1 deletion pkg/ccl/importccl/read_import_mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,7 @@ type delayedFK struct {
func addDelayedFKs(ctx context.Context, defs []delayedFK, resolver fkResolver) error {
for _, def := range defs {
if err := sql.ResolveFK(
ctx, nil, resolver, def.tbl, def.def, map[sqlbase.ID]*sqlbase.MutableTableDescriptor{}, sql.NewTable,
ctx, nil, resolver, def.tbl, def.def, map[sqlbase.ID]*sqlbase.MutableTableDescriptor{}, sql.NewTable, tree.ValidationDefault,
); err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/importccl/read_import_pgdump.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ func readPostgresCreateTable(
continue
}
for _, constraint := range constraints {
if err := sql.ResolveFK(evalCtx.Ctx(), nil /* txn */, fks.resolver, desc, constraint, backrefs, sql.NewTable); err != nil {
if err := sql.ResolveFK(evalCtx.Ctx(), nil /* txn */, fks.resolver, desc, constraint, backrefs, sql.NewTable, tree.ValidationDefault); err != nil {
return nil, err
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/alter_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ func (n *alterTableNode) startExec(params runParams) error {
} else {
tableState = NonEmptyTable
}
err = params.p.resolveFK(params.ctx, n.tableDesc, d, affected, tableState)
err = params.p.resolveFK(params.ctx, n.tableDesc, d, affected, tableState, t.ValidationBehavior)
})
if err != nil {
return err
Expand Down
16 changes: 11 additions & 5 deletions pkg/sql/create_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,8 +384,9 @@ func (p *planner) resolveFK(
d *tree.ForeignKeyConstraintTableDef,
backrefs map[sqlbase.ID]*sqlbase.MutableTableDescriptor,
ts FKTableState,
validationState tree.ValidationBehavior,
) error {
return ResolveFK(ctx, p.txn, p, tbl, d, backrefs, ts)
return ResolveFK(ctx, p.txn, p, tbl, d, backrefs, ts, validationState)
}

func qualifyFKColErrorWithDB(
Expand Down Expand Up @@ -421,8 +422,8 @@ const (
// It may, in doing so, add to or alter descriptors in the passed in `backrefs`
// map of other tables that need to be updated when this table is created.
// Constraints that are not known to hold for existing data are created
// "unvalidated", but when table is empty (e.g. during creation), no existing
// data imples no existing violations, and thus the constraint can be created
// "unvalidated", but when table is empty (e.g. during creating on), no existing
// data implies no existing violations, and thus the constraint can be created
// without the unvalidated flag.
//
// The caller should pass an instance of fkSelfResolver as
Expand All @@ -445,6 +446,7 @@ func ResolveFK(
d *tree.ForeignKeyConstraintTableDef,
backrefs map[sqlbase.ID]*sqlbase.MutableTableDescriptor,
ts FKTableState,
validationState tree.ValidationBehavior,
) error {
for _, col := range d.FromCols {
col, _, err := tbl.FindColumnByName(col)
Expand Down Expand Up @@ -586,7 +588,11 @@ func ResolveFK(
}

if ts != NewTable {
ref.Validity = sqlbase.ConstraintValidity_Validating
if validationState == tree.ValidationSkip {
ref.Validity = sqlbase.ConstraintValidity_Unvalidated
} else {
ref.Validity = sqlbase.ConstraintValidity_Validating
}
}
backref := sqlbase.ForeignKeyReference{Table: tbl.ID}

Expand Down Expand Up @@ -1282,7 +1288,7 @@ func MakeTableDesc(
desc.Checks = append(desc.Checks, ck)

case *tree.ForeignKeyConstraintTableDef:
if err := ResolveFK(ctx, txn, fkResolver, &desc, d, affected, NewTable); err != nil {
if err := ResolveFK(ctx, txn, fkResolver, &desc, d, affected, NewTable, tree.ValidationDefault); err != nil {
return desc, err
}

Expand Down
36 changes: 36 additions & 0 deletions pkg/sql/logictest/testdata/logic_test/fk
Original file line number Diff line number Diff line change
Expand Up @@ -2101,3 +2101,39 @@ DELETE FROM t1 WHERE x IS NULL

statement ok
DROP TABLE t2, t2 CASCADE

statement ok
CREATE TABLE person (id INT PRIMARY KEY, age INT, name STRING)

statement ok
CREATE TABLE pet (id INT PRIMARY KEY, name STRING)

statement ok
INSERT INTO pet VALUES (0, 'crookshanks')

statement error pq: foreign key violation: "pet" row id=0 has no match in "person"
ALTER TABLE pet ADD CONSTRAINT fk_constraint FOREIGN KEY (id) REFERENCES person (id)

statement ok
ALTER TABLE pet ADD CONSTRAINT fk_constraint FOREIGN KEY (id) REFERENCES person (id) NOT VALID

query TTTTB
SHOW CONSTRAINTS FROM pet
----
pet fk_constraint FOREIGN KEY FOREIGN KEY (id) REFERENCES person(id) false
pet primary PRIMARY KEY PRIMARY KEY (id ASC) true

statement error pq: foreign key violation: "pet" row id=0 has no match in "person"
ALTER TABLE pet VALIDATE CONSTRAINT fk_constraint

statement ok
INSERT INTO person VALUES (0, 18, 'Hermione Granger')

statement ok
ALTER TABLE pet VALIDATE CONSTRAINT fk_constraint

query TTTTB
SHOW CONSTRAINTS FROM pet
----
pet fk_constraint FOREIGN KEY FOREIGN KEY (id) REFERENCES person(id) true
pet primary PRIMARY KEY PRIMARY KEY (id ASC) true
85 changes: 74 additions & 11 deletions pkg/sql/schema_changer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1302,13 +1302,52 @@ func (sc *SchemaChanger) done(ctx context.Context) (*sqlbase.ImmutableTableDescr
isRollback := false
jobSucceeded := true
now := timeutil.Now().UnixNano()
return sc.leaseMgr.Publish(ctx, sc.tableID, func(desc *sqlbase.MutableTableDescriptor) error {

// Get the other tables whose foreign key backreferences need to be removed.
var fksByBackrefTable map[sqlbase.ID][]*sqlbase.ConstraintToUpdate
err := sc.db.Txn(ctx, func(ctx context.Context, txn *client.Txn) error {
fksByBackrefTable = make(map[sqlbase.ID][]*sqlbase.ConstraintToUpdate)

desc, err := sqlbase.GetTableDescFromID(ctx, txn, sc.tableID)
if err != nil {
return err
}
for _, mutation := range desc.Mutations {
if mutation.MutationID != sc.mutationID {
break
}
if constraint := mutation.GetConstraint(); constraint != nil &&
constraint.ConstraintType == sqlbase.ConstraintToUpdate_FOREIGN_KEY &&
mutation.Direction == sqlbase.DescriptorMutation_ADD &&
constraint.ForeignKey.Validity == sqlbase.ConstraintValidity_Unvalidated {
fk := &constraint.ForeignKey
if fk.Table != desc.ID {
fksByBackrefTable[constraint.ForeignKey.Table] = append(fksByBackrefTable[constraint.ForeignKey.Table], constraint)
}
}
}
return nil
})
if err != nil {
return nil, err
}
tableIDsToUpdate := make([]sqlbase.ID, 0, len(fksByBackrefTable)+1)
tableIDsToUpdate = append(tableIDsToUpdate, sc.tableID)
for id := range fksByBackrefTable {
tableIDsToUpdate = append(tableIDsToUpdate, id)
}

update := func(descs map[sqlbase.ID]*sqlbase.MutableTableDescriptor) error {
// Reset vars here because update function can be called multiple times in a retry.
isRollback = false
jobSucceeded = true

i := 0
for _, mutation := range desc.Mutations {
scDesc, ok := descs[sc.tableID]
if !ok {
return errors.AssertionFailedf("required table with ID %d not provided to update closure", sc.tableID)
}
for _, mutation := range scDesc.Mutations {
if mutation.MutationID != sc.mutationID {
// Mutations are applied in a FIFO order. Only apply the first set of
// mutations if they have the mutation ID we're looking for.
Expand All @@ -1319,16 +1358,33 @@ func (sc *SchemaChanger) done(ctx context.Context) (*sqlbase.ImmutableTableDescr
indexDesc != nil {
if sc.canClearRangeForDrop(indexDesc) {
jobSucceeded = false
desc.GCMutations = append(
desc.GCMutations,
scDesc.GCMutations = append(
scDesc.GCMutations,
sqlbase.TableDescriptor_GCDescriptorMutation{
IndexID: indexDesc.ID,
DropTime: now,
JobID: *sc.job.ID(),
})
}
}
if err := desc.MakeMutationComplete(mutation); err != nil {
if constraint := mutation.GetConstraint(); constraint != nil &&
constraint.ConstraintType == sqlbase.ConstraintToUpdate_FOREIGN_KEY &&
mutation.Direction == sqlbase.DescriptorMutation_ADD &&
constraint.ForeignKey.Validity == sqlbase.ConstraintValidity_Unvalidated {
// Add backreference on the referenced table (which could be the same table)
backref := sqlbase.ForeignKeyReference{Table: sc.tableID, Index: constraint.ForeignKeyIndex}
backrefTable, ok := descs[constraint.ForeignKey.Table]
if !ok {
return errors.AssertionFailedf("required table with ID %d not provided to update closure", sc.tableID)
}
backrefIdx, err := backrefTable.FindIndexByID(constraint.ForeignKey.Index)
if err != nil {
return err
}
// TODO (tyler): Consider case where a retry is done and this method is called again
backrefIdx.ReferencedBy = append(backrefIdx.ReferencedBy, backref)
}
if err := scDesc.MakeMutationComplete(mutation); err != nil {
return err
}
i++
Expand All @@ -1339,17 +1395,19 @@ func (sc *SchemaChanger) done(ctx context.Context) (*sqlbase.ImmutableTableDescr
return errDidntUpdateDescriptor
}
// Trim the executed mutations from the descriptor.
desc.Mutations = desc.Mutations[i:]
scDesc.Mutations = scDesc.Mutations[i:]

for i, g := range desc.MutationJobs {
for i, g := range scDesc.MutationJobs {
if g.MutationID == sc.mutationID {
// Trim the executed mutation group from the descriptor.
desc.MutationJobs = append(desc.MutationJobs[:i], desc.MutationJobs[i+1:]...)
scDesc.MutationJobs = append(scDesc.MutationJobs[:i], scDesc.MutationJobs[i+1:]...)
break
}
}
return nil
}, func(txn *client.Txn) error {
}

descs, err := sc.leaseMgr.PublishMultiple(ctx, tableIDsToUpdate, update, func(txn *client.Txn) error {
if jobSucceeded {
if err := sc.job.WithTxn(txn).Succeeded(ctx, jobs.NoopFn); err != nil {
return errors.NewAssertionErrorWithWrappedErrf(err,
Expand Down Expand Up @@ -1384,6 +1442,10 @@ func (sc *SchemaChanger) done(ctx context.Context) (*sqlbase.ImmutableTableDescr
}{uint32(sc.mutationID)},
)
})
if err != nil {
return nil, err
}
return descs[sc.tableID], nil
}

// notFirstInLine returns true whenever the schema change has been queued
Expand Down Expand Up @@ -1445,7 +1507,7 @@ func (sc *SchemaChanger) refreshStats() {

// reverseMutations reverses the direction of all the mutations with the
// mutationID. This is called after hitting an irrecoverable error while
// applying a schema change. If a column being added is reversed and droped,
// applying a schema change. If a column being added is reversed and dropped,
// all new indexes referencing the column will also be dropped.
func (sc *SchemaChanger) reverseMutations(ctx context.Context, causingError error) error {
// Reverse the flow of the state machine.
Expand All @@ -1466,7 +1528,8 @@ func (sc *SchemaChanger) reverseMutations(ctx context.Context, causingError erro
}
if constraint := mutation.GetConstraint(); constraint != nil &&
constraint.ConstraintType == sqlbase.ConstraintToUpdate_FOREIGN_KEY &&
mutation.Direction == sqlbase.DescriptorMutation_ADD {
mutation.Direction == sqlbase.DescriptorMutation_ADD &&
constraint.ForeignKey.Validity == sqlbase.ConstraintValidity_Validating {
fk := &constraint.ForeignKey
if fk.Table != desc.ID {
fksByBackrefTable[constraint.ForeignKey.Table] = append(fksByBackrefTable[constraint.ForeignKey.Table], constraint)
Expand Down
7 changes: 6 additions & 1 deletion pkg/sql/sqlbase/structured.go
Original file line number Diff line number Diff line change
Expand Up @@ -2407,7 +2407,12 @@ func (desc *MutableTableDescriptor) MakeMutationComplete(m DescriptorMutation) e
if err != nil {
return err
}
idx.ForeignKey.Validity = ConstraintValidity_Validated
switch t.Constraint.ForeignKey.Validity {
case ConstraintValidity_Validating:
idx.ForeignKey.Validity = ConstraintValidity_Validated
case ConstraintValidity_Unvalidated:
idx.ForeignKey = t.Constraint.ForeignKey
}
case ConstraintToUpdate_NOT_NULL:
// Remove the dummy check constraint that was in place during validation
for i, c := range desc.Checks {
Expand Down

0 comments on commit eb16f9f

Please sign in to comment.