Skip to content

Commit

Permalink
sql: add support for NOT VALID Foreign Key constraints
Browse files Browse the repository at this point in the history
Implemented the support of not validating an ALTER TABLE foreign key
constraint to a table. Within alter_table.go we set the validity of
the foreign key reference to be unvalidated, and within shema_changer.go
we then check the case where the foreign key being added is unvalidated
and do not check its validity.

This allows users to create a FK reference between two tables whose rows
don't comply with the constraint when it's first added.

Release note: None
  • Loading branch information
Tyler314 committed Jul 9, 2019
1 parent f49bd40 commit 033595d
Show file tree
Hide file tree
Showing 9 changed files with 171 additions and 30 deletions.
2 changes: 1 addition & 1 deletion pkg/ccl/importccl/read_import_mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,7 @@ type delayedFK struct {
func addDelayedFKs(ctx context.Context, defs []delayedFK, resolver fkResolver) error {
for _, def := range defs {
if err := sql.ResolveFK(
ctx, nil, resolver, def.tbl, def.def, map[sqlbase.ID]*sqlbase.MutableTableDescriptor{}, sql.NewTable,
ctx, nil, resolver, def.tbl, def.def, map[sqlbase.ID]*sqlbase.MutableTableDescriptor{}, sql.NewTable, tree.ValidationDefault,
); err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/importccl/read_import_pgdump.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ func readPostgresCreateTable(
continue
}
for _, constraint := range constraints {
if err := sql.ResolveFK(evalCtx.Ctx(), nil /* txn */, fks.resolver, desc, constraint, backrefs, sql.NewTable); err != nil {
if err := sql.ResolveFK(evalCtx.Ctx(), nil /* txn */, fks.resolver, desc, constraint, backrefs, sql.NewTable, tree.ValidationDefault); err != nil {
return nil, err
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/alter_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ func (n *alterTableNode) startExec(params runParams) error {
} else {
tableState = NonEmptyTable
}
err = params.p.resolveFK(params.ctx, n.tableDesc, d, affected, tableState)
err = params.p.resolveFK(params.ctx, n.tableDesc, d, affected, tableState, t.ValidationBehavior)
})
if err != nil {
return err
Expand Down
20 changes: 15 additions & 5 deletions pkg/sql/create_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,8 +384,9 @@ func (p *planner) resolveFK(
d *tree.ForeignKeyConstraintTableDef,
backrefs map[sqlbase.ID]*sqlbase.MutableTableDescriptor,
ts FKTableState,
validationBehavior tree.ValidationBehavior,
) error {
return ResolveFK(ctx, p.txn, p, tbl, d, backrefs, ts)
return ResolveFK(ctx, p.txn, p, tbl, d, backrefs, ts, validationBehavior)
}

func qualifyFKColErrorWithDB(
Expand Down Expand Up @@ -421,8 +422,8 @@ const (
// It may, in doing so, add to or alter descriptors in the passed in `backrefs`
// map of other tables that need to be updated when this table is created.
// Constraints that are not known to hold for existing data are created
// "unvalidated", but when table is empty (e.g. during creation), no existing
// data imples no existing violations, and thus the constraint can be created
// "unvalidated", but when table is empty (e.g. during creating on), no existing
// data implies no existing violations, and thus the constraint can be created
// without the unvalidated flag.
//
// The caller should pass an instance of fkSelfResolver as
Expand All @@ -437,6 +438,10 @@ const (
//
// The passed Txn is used to lookup databases to qualify names in error messages
// but if nil, will result in unqualified names in those errors.
//
// The passed validationBehavior is used to determine whether or not preexisting
// entries in the backref table need to be validated against the foreign key
// being added.
func ResolveFK(
ctx context.Context,
txn *client.Txn,
Expand All @@ -445,6 +450,7 @@ func ResolveFK(
d *tree.ForeignKeyConstraintTableDef,
backrefs map[sqlbase.ID]*sqlbase.MutableTableDescriptor,
ts FKTableState,
validationBehavior tree.ValidationBehavior,
) error {
for _, col := range d.FromCols {
col, _, err := tbl.FindColumnByName(col)
Expand Down Expand Up @@ -586,7 +592,11 @@ func ResolveFK(
}

if ts != NewTable {
ref.Validity = sqlbase.ConstraintValidity_Validating
if validationBehavior == tree.ValidationSkip {
ref.Validity = sqlbase.ConstraintValidity_Unvalidated
} else {
ref.Validity = sqlbase.ConstraintValidity_Validating
}
}
backref := sqlbase.ForeignKeyReference{Table: tbl.ID}

Expand Down Expand Up @@ -1282,7 +1292,7 @@ func MakeTableDesc(
desc.Checks = append(desc.Checks, ck)

case *tree.ForeignKeyConstraintTableDef:
if err := ResolveFK(ctx, txn, fkResolver, &desc, d, affected, NewTable); err != nil {
if err := ResolveFK(ctx, txn, fkResolver, &desc, d, affected, NewTable, tree.ValidationDefault); err != nil {
return desc, err
}

Expand Down
51 changes: 44 additions & 7 deletions pkg/sql/logictest/testdata/logic_test/fk
Original file line number Diff line number Diff line change
Expand Up @@ -1390,10 +1390,6 @@ subtest unvalidated_fk_plan
# To get an unvalidated foreign key for testing, use the loophole that we
# currently don't support adding a validated FK in the same transaction as
# CREATE TABLE
# TODO (lucy): Once this is no longer true (and we support adding NOT VALID
# constraints), update this test
statement ok
BEGIN

statement ok
CREATE TABLE a (
Expand All @@ -1415,10 +1411,10 @@ statement ok
INSERT INTO b (a_x, a_y, a_z) VALUES ('x2', 'y1', 'z1')

statement ok
ALTER TABLE b ADD CONSTRAINT fk_ref FOREIGN KEY (a_z, a_y, a_x) REFERENCES a (z, y, x)
ALTER TABLE b ADD CONSTRAINT fk_ref FOREIGN KEY (a_z, a_y, a_x) REFERENCES a (z, y, x) NOT VALID

statement ok
COMMIT
statement error pq: foreign key violation: "b" row a_z='z1', a_y='y1', a_x='x2', rowid=[0-9]* has no match in "a"
ALTER TABLE b VALIDATE CONSTRAINT fk_ref

# Verify that the optimizer doesn't use an unvalidated constraint to simplify plans.
query TTT
Expand Down Expand Up @@ -2101,3 +2097,44 @@ DELETE FROM t1 WHERE x IS NULL

statement ok
DROP TABLE t2, t2 CASCADE

subtest test_not_valid_fk

statement ok
CREATE TABLE person (id INT PRIMARY KEY, age INT, name STRING)

statement ok
CREATE TABLE pet (id INT PRIMARY KEY, name STRING)

statement ok
INSERT INTO pet VALUES (0, 'crookshanks')

statement error pq: foreign key violation: "pet" row id=0 has no match in "person"
ALTER TABLE pet ADD CONSTRAINT fk_constraint FOREIGN KEY (id) REFERENCES person (id)

statement ok
ALTER TABLE pet ADD CONSTRAINT fk_constraint FOREIGN KEY (id) REFERENCES person (id) NOT VALID

query TTTTB
SHOW CONSTRAINTS FROM pet
----
pet fk_constraint FOREIGN KEY FOREIGN KEY (id) REFERENCES person(id) false
pet primary PRIMARY KEY PRIMARY KEY (id ASC) true

statement error pq: foreign key violation: "pet" row id=0 has no match in "person"
ALTER TABLE pet VALIDATE CONSTRAINT fk_constraint

statement ok
INSERT INTO person VALUES (0, 18, 'Hermione Granger')

statement ok
ALTER TABLE pet VALIDATE CONSTRAINT fk_constraint

query TTTTB
SHOW CONSTRAINTS FROM pet
----
pet fk_constraint FOREIGN KEY FOREIGN KEY (id) REFERENCES person(id) true
pet primary PRIMARY KEY PRIMARY KEY (id ASC) true

statement ok
DROP TABLE person, pet
16 changes: 14 additions & 2 deletions pkg/sql/logictest/testdata/logic_test/schema_change_in_txn
Original file line number Diff line number Diff line change
Expand Up @@ -934,17 +934,23 @@ statement ok
ALTER TABLE check_table ADD c INT

statement ok
ALTER TABLE check_table ADD CONSTRAINT c_0 CHECK (c > 0)
ALTER TABLE check_table ADD CONSTRAINT c_0 CHECK (c > 0) NOT VALID

statement ok
ALTER TABLE check_table ADD d INT DEFAULT 1

statement ok
ALTER TABLE check_table ADD CONSTRAINT d_0 CHECK (d > 0)
ALTER TABLE check_table ADD CONSTRAINT d_0 CHECK (d > 0) NOT VALID

statement ok
COMMIT

statement ok
ALTER TABLE check_table VALIDATE CONSTRAINT c_0

statement ok
ALTER TABLE check_table VALIDATE CONSTRAINT d_0

query TTTTB
SHOW CONSTRAINTS FROM check_table
----
Expand Down Expand Up @@ -977,6 +983,12 @@ ALTER TABLE check_table ADD CONSTRAINT e_0 CHECK (e::INT > 0)
statement error pgcode XXA00 validate check constraint: could not parse "a" as type int
COMMIT

statement ok
ALTER TABLE check_table VALIDATE CONSTRAINT c_0

statement ok
ALTER TABLE check_table VALIDATE CONSTRAINT d_0

# Constraint e_0 was not added
query TTTTB
SHOW CONSTRAINTS FROM check_table
Expand Down
5 changes: 4 additions & 1 deletion pkg/sql/opt/exec/execbuilder/testdata/fk
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@ CREATE TABLE b (
)

statement ok
ALTER TABLE b ADD CONSTRAINT fk_ref FOREIGN KEY (a_z, a_y, a_x) REFERENCES a (z, y, x)
ALTER TABLE b ADD CONSTRAINT fk_ref FOREIGN KEY (a_z, a_y, a_x) REFERENCES a (z, y, x) NOT VALID

statement ok
ALTER TABLE b VALIDATE CONSTRAINT fk_ref

statement ok
COMMIT
Expand Down
86 changes: 75 additions & 11 deletions pkg/sql/schema_changer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1211,13 +1211,53 @@ func (sc *SchemaChanger) done(ctx context.Context) (*sqlbase.ImmutableTableDescr
isRollback := false
jobSucceeded := true
now := timeutil.Now().UnixNano()
return sc.leaseMgr.Publish(ctx, sc.tableID, func(desc *sqlbase.MutableTableDescriptor) error {

// Get the other tables whose foreign key backreferences need to be removed.
var fksByBackrefTable map[sqlbase.ID][]*sqlbase.ConstraintToUpdate
err := sc.db.Txn(ctx, func(ctx context.Context, txn *client.Txn) error {
fksByBackrefTable = make(map[sqlbase.ID][]*sqlbase.ConstraintToUpdate)

desc, err := sqlbase.GetTableDescFromID(ctx, txn, sc.tableID)
if err != nil {
return err
}
for _, mutation := range desc.Mutations {
if mutation.MutationID != sc.mutationID {
break
}
if constraint := mutation.GetConstraint(); constraint != nil &&
constraint.ConstraintType == sqlbase.ConstraintToUpdate_FOREIGN_KEY &&
mutation.Direction == sqlbase.DescriptorMutation_ADD &&
constraint.ForeignKey.Validity == sqlbase.ConstraintValidity_Unvalidated {
// Add backref table to referenced table with an unvalidated foreign key constraint
fk := &constraint.ForeignKey
if fk.Table != desc.ID {
fksByBackrefTable[constraint.ForeignKey.Table] = append(fksByBackrefTable[constraint.ForeignKey.Table], constraint)
}
}
}
return nil
})
if err != nil {
return nil, err
}
tableIDsToUpdate := make([]sqlbase.ID, 0, len(fksByBackrefTable)+1)
tableIDsToUpdate = append(tableIDsToUpdate, sc.tableID)
for id := range fksByBackrefTable {
tableIDsToUpdate = append(tableIDsToUpdate, id)
}

update := func(descs map[sqlbase.ID]*sqlbase.MutableTableDescriptor) error {
// Reset vars here because update function can be called multiple times in a retry.
isRollback = false
jobSucceeded = true

i := 0
for _, mutation := range desc.Mutations {
scDesc, ok := descs[sc.tableID]
if !ok {
return errors.AssertionFailedf("required table with ID %d not provided to update closure", sc.tableID)
}
for _, mutation := range scDesc.Mutations {
if mutation.MutationID != sc.mutationID {
// Mutations are applied in a FIFO order. Only apply the first set of
// mutations if they have the mutation ID we're looking for.
Expand All @@ -1228,16 +1268,32 @@ func (sc *SchemaChanger) done(ctx context.Context) (*sqlbase.ImmutableTableDescr
indexDesc != nil {
if sc.canClearRangeForDrop(indexDesc) {
jobSucceeded = false
desc.GCMutations = append(
desc.GCMutations,
scDesc.GCMutations = append(
scDesc.GCMutations,
sqlbase.TableDescriptor_GCDescriptorMutation{
IndexID: indexDesc.ID,
DropTime: now,
JobID: *sc.job.ID(),
})
}
}
if err := desc.MakeMutationComplete(mutation); err != nil {
if constraint := mutation.GetConstraint(); constraint != nil &&
constraint.ConstraintType == sqlbase.ConstraintToUpdate_FOREIGN_KEY &&
mutation.Direction == sqlbase.DescriptorMutation_ADD &&
constraint.ForeignKey.Validity == sqlbase.ConstraintValidity_Unvalidated {
// Add backreference on the referenced table (which could be the same table)
backref := sqlbase.ForeignKeyReference{Table: sc.tableID, Index: constraint.ForeignKeyIndex}
backrefTable, ok := descs[constraint.ForeignKey.Table]
if !ok {
return errors.AssertionFailedf("required table with ID %d not provided to update closure", sc.tableID)
}
backrefIdx, err := backrefTable.FindIndexByID(constraint.ForeignKey.Index)
if err != nil {
return err
}
backrefIdx.ReferencedBy = append(backrefIdx.ReferencedBy, backref)
}
if err := scDesc.MakeMutationComplete(mutation); err != nil {
return err
}
i++
Expand All @@ -1248,17 +1304,20 @@ func (sc *SchemaChanger) done(ctx context.Context) (*sqlbase.ImmutableTableDescr
return errDidntUpdateDescriptor
}
// Trim the executed mutations from the descriptor.
desc.Mutations = desc.Mutations[i:]
scDesc.Mutations = scDesc.Mutations[i:]

for i, g := range desc.MutationJobs {
for i, g := range scDesc.MutationJobs {
if g.MutationID == sc.mutationID {
// Trim the executed mutation group from the descriptor.
desc.MutationJobs = append(desc.MutationJobs[:i], desc.MutationJobs[i+1:]...)
scDesc.MutationJobs = append(scDesc.MutationJobs[:i], scDesc.MutationJobs[i+1:]...)
break
}
}
return nil
}, func(txn *client.Txn) error {
}

// Call PublishMultiple to handle the situation to add Foreign Key backreferences.
descs, err := sc.leaseMgr.PublishMultiple(ctx, tableIDsToUpdate, update, func(txn *client.Txn) error {
if jobSucceeded {
if err := sc.job.WithTxn(txn).Succeeded(ctx, jobs.NoopFn); err != nil {
return errors.NewAssertionErrorWithWrappedErrf(err,
Expand Down Expand Up @@ -1293,6 +1352,10 @@ func (sc *SchemaChanger) done(ctx context.Context) (*sqlbase.ImmutableTableDescr
}{uint32(sc.mutationID)},
)
})
if err != nil {
return nil, err
}
return descs[sc.tableID], nil
}

// notFirstInLine returns true whenever the schema change has been queued
Expand Down Expand Up @@ -1354,7 +1417,7 @@ func (sc *SchemaChanger) refreshStats() {

// reverseMutations reverses the direction of all the mutations with the
// mutationID. This is called after hitting an irrecoverable error while
// applying a schema change. If a column being added is reversed and droped,
// applying a schema change. If a column being added is reversed and dropped,
// all new indexes referencing the column will also be dropped.
func (sc *SchemaChanger) reverseMutations(ctx context.Context, causingError error) error {
// Reverse the flow of the state machine.
Expand All @@ -1375,7 +1438,8 @@ func (sc *SchemaChanger) reverseMutations(ctx context.Context, causingError erro
}
if constraint := mutation.GetConstraint(); constraint != nil &&
constraint.ConstraintType == sqlbase.ConstraintToUpdate_FOREIGN_KEY &&
mutation.Direction == sqlbase.DescriptorMutation_ADD {
mutation.Direction == sqlbase.DescriptorMutation_ADD &&
constraint.ForeignKey.Validity == sqlbase.ConstraintValidity_Validating {
fk := &constraint.ForeignKey
if fk.Table != desc.ID {
fksByBackrefTable[constraint.ForeignKey.Table] = append(fksByBackrefTable[constraint.ForeignKey.Table], constraint)
Expand Down
17 changes: 16 additions & 1 deletion pkg/sql/sqlbase/structured.go
Original file line number Diff line number Diff line change
Expand Up @@ -2374,6 +2374,10 @@ func (desc *TableDescriptor) IsInterleaved() bool {
}

// MakeMutationComplete updates the descriptor upon completion of a mutation.
// There are three Validity types for the mutations:
// Validated - The constraint has already been added and successfully validated.
// Validating - The constraint has already been added, and just needs to be marked as validated.
// Unvalidated - The constraint has not yet been added.
func (desc *MutableTableDescriptor) MakeMutationComplete(m DescriptorMutation) error {
switch m.Direction {
case DescriptorMutation_ADD:
Expand All @@ -2391,6 +2395,7 @@ func (desc *MutableTableDescriptor) MakeMutationComplete(m DescriptorMutation) e
case ConstraintToUpdate_CHECK:
switch t.Constraint.Check.Validity {
case ConstraintValidity_Unvalidated:
// add the constraint to a slice of constraints to be checked
desc.Checks = append(desc.Checks, &t.Constraint.Check)
case ConstraintValidity_Validating:
for _, c := range desc.Checks {
Expand All @@ -2403,11 +2408,21 @@ func (desc *MutableTableDescriptor) MakeMutationComplete(m DescriptorMutation) e
return errors.AssertionFailedf("invalid constraint validity state: %d", t.Constraint.Check.Validity)
}
case ConstraintToUpdate_FOREIGN_KEY:
// Takes care of adding the Foreign Key to the table index. Adding the backref table to the table index of the
// referenced table is taken care in another call, which is done in a call to makeMutationComplete later
// in this function.
// TODO (tyler): Combine both of these tasks in the same place.
idx, err := desc.FindIndexByID(t.Constraint.ForeignKeyIndex)
if err != nil {
return err
}
idx.ForeignKey.Validity = ConstraintValidity_Validated
switch t.Constraint.ForeignKey.Validity {
case ConstraintValidity_Validating:
idx.ForeignKey.Validity = ConstraintValidity_Validated
case ConstraintValidity_Unvalidated:
// In the case of Foreign Keys, we update the index to hold the FK to be added
idx.ForeignKey = t.Constraint.ForeignKey
}
case ConstraintToUpdate_NOT_NULL:
// Remove the dummy check constraint that was in place during validation
for i, c := range desc.Checks {
Expand Down

0 comments on commit 033595d

Please sign in to comment.