Skip to content

Commit

Permalink
ddl: add validate when do add foreign key ddl job (#38169)
Browse files Browse the repository at this point in the history
close #38168
  • Loading branch information
crazycs520 authored Sep 27, 2022
1 parent 0875ae2 commit 7b3c503
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 9 deletions.
19 changes: 12 additions & 7 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -6200,17 +6200,22 @@ func (d *ddl) CreateForeignKey(ctx sessionctx.Context, ti ast.Ident, fkName mode
return infoschema.ErrCannotAddForeign
}

// Check the uniqueness of the FK.
for _, fk := range t.Meta().ForeignKeys {
if fk.Name.L == fkName.L {
return dbterror.ErrFkDupName.GenWithStackByArgs(fkName.O)
}
if fkName.L == "" {
fkName = model.NewCIStr(fmt.Sprintf("fk_%d", t.Meta().MaxForeignKeyID+1))
}
err = checkFKDupName(t.Meta(), fkName)
if err != nil {
return err
}

fkInfo, err := buildFKInfo(ctx, fkName, keys, refer, t.Cols())
if err != nil {
return errors.Trace(err)
}
fkCheck := ctx.GetSessionVars().ForeignKeyChecks
err = checkAddForeignKeyValid(is, schema.Name.L, t.Meta(), fkInfo, fkCheck)
if err != nil {
return err
}

job := &model.Job{
SchemaID: schema.ID,
Expand All @@ -6219,7 +6224,7 @@ func (d *ddl) CreateForeignKey(ctx sessionctx.Context, ti ast.Ident, fkName mode
TableName: t.Meta().Name.L,
Type: model.ActionAddForeignKey,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{fkInfo},
Args: []interface{}{fkInfo, fkCheck},
}

err = d.DoDDLJob(ctx, job)
Expand Down
59 changes: 57 additions & 2 deletions ddl/foreign_key.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,17 @@ func onCreateForeignKey(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ e
}

var fkInfo model.FKInfo
err = job.DecodeArgs(&fkInfo)
var fkCheck bool
err = job.DecodeArgs(&fkInfo, &fkCheck)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
fkInfo.ID = AllocateIndexID(tblInfo)
err = checkAddForeignKeyValidInOwner(d, t, job, job.SchemaName, tblInfo, &fkInfo, fkCheck)
if err != nil {
return ver, err
}
fkInfo.ID = allocateFKIndexID(tblInfo)
tblInfo.ForeignKeys = append(tblInfo.ForeignKeys, &fkInfo)

originalState := fkInfo.State
Expand Down Expand Up @@ -593,3 +598,53 @@ func checkDatabaseHasForeignKeyReferredInOwner(d *ddlCtx, t *meta.Meta, job *mod
}
return errors.Trace(err)
}

func checkFKDupName(tbInfo *model.TableInfo, fkName model.CIStr) error {
for _, fkInfo := range tbInfo.ForeignKeys {
if fkName.L == fkInfo.Name.L {
return dbterror.ErrFkDupName.GenWithStackByArgs(fkName.O)
}
}
return nil
}

func checkAddForeignKeyValid(is infoschema.InfoSchema, schema string, tbInfo *model.TableInfo, fk *model.FKInfo, fkCheck bool) error {
if !variable.EnableForeignKey.Load() {
return nil
}
err := checkTableForeignKeyValid(is, schema, tbInfo, fk, fkCheck)
if err != nil {
return err
}
if len(fk.Cols) == 1 && tbInfo.PKIsHandle {
pkCol := tbInfo.GetPkColInfo()
if pkCol != nil && pkCol.Name.L == fk.Cols[0].L {
return nil
}
}
// check foreign key columns should have index.
// TODO(crazycs520): we can remove this check after TiDB support auto create index if needed when add foreign key.
if model.FindIndexByColumns(tbInfo, fk.Cols...) == nil {
return errors.Errorf("Failed to add the foreign key constraint. Missing index for '%s' foreign key columns in the table '%s'", fk.Name, tbInfo.Name)
}
return nil
}

func checkAddForeignKeyValidInOwner(d *ddlCtx, t *meta.Meta, job *model.Job, schema string, tbInfo *model.TableInfo, fk *model.FKInfo, fkCheck bool) error {
err := checkFKDupName(tbInfo, fk.Name)
if err != nil {
return err
}
if !variable.EnableForeignKey.Load() {
return nil
}
is, err := getAndCheckLatestInfoSchema(d, t)
if err != nil {
return errors.Trace(err)
}
err = checkAddForeignKeyValid(is, schema, tbInfo, fk, fkCheck)
if err != nil {
job.State = model.JobStateCancelled
}
return errors.Trace(err)
}
58 changes: 58 additions & 0 deletions ddl/foreign_key_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/parser/auth"
"github.com/pingcap/tidb/parser/model"
Expand Down Expand Up @@ -1404,6 +1405,63 @@ func TestDropDatabaseWithForeignKeyReferred2(t *testing.T) {
tk.MustExec("drop database test")
}

func TestAddForeignKey(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("set @@global.tidb_enable_foreign_key=1")
tk.MustExec("set @@foreign_key_checks=1;")
tk.MustExec("use test")
tk.MustExec("create table t1 (id int key, b int);")
tk.MustExec("create table t2 (id int key, b int);")
err := tk.ExecToErr("alter table t2 add foreign key (b) references t1(id);")
require.Error(t, err)
require.Equal(t, "Failed to add the foreign key constraint. Missing index for 'fk_1' foreign key columns in the table 't2'", err.Error())
tk.MustExec("alter table t2 add index(b)")
tk.MustExec("alter table t2 add foreign key (b) references t1(id);")
tbl2Info := getTableInfo(t, dom, "test", "t2")
require.Equal(t, int64(1), tbl2Info.MaxForeignKeyID)
tk.MustGetDBError("alter table t2 add foreign key (b) references t1(b);", infoschema.ErrForeignKeyNoIndexInParent)
tk.MustExec("alter table t1 add index(b)")
tk.MustExec("alter table t2 add foreign key (b) references t1(b);")
tk.MustGetDBError("alter table t2 add foreign key (b) references t2(b);", infoschema.ErrCannotAddForeign)
}

func TestAddForeignKey2(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomainWithSchemaLease(t, testLease)
d := dom.DDL()
tk := testkit.NewTestKit(t, store)
tk.MustExec("set @@global.tidb_enable_foreign_key=1")
tk.MustExec("set @@foreign_key_checks=1;")
tk.MustExec("use test")
tk2 := testkit.NewTestKit(t, store)
tk2.MustExec("use test")
tk.MustExec("create table t1 (id int key, b int, index(b));")
tk.MustExec("create table t2 (id int key, b int, index(b));")
var wg sync.WaitGroup
var addErr error
tc := &ddl.TestDDLCallback{}
tc.OnJobRunBeforeExported = func(job *model.Job) {
if job.SchemaState != model.StatePublic || job.Type != model.ActionDropIndex {
return
}
wg.Add(1)
go func() {
defer wg.Done()
addErr = tk2.ExecToErr("alter table t2 add foreign key (b) references t1(id);")
}()
// make sure tk2's ddl job already put into ddl job queue.
time.Sleep(time.Millisecond * 100)
}
originalHook := d.GetHook()
defer d.SetHook(originalHook)
d.SetHook(tc)

tk.MustExec("alter table t2 drop index b")
wg.Wait()
require.Error(t, addErr)
require.Equal(t, "[ddl:-1]Failed to add the foreign key constraint. Missing index for 'fk_1' foreign key columns in the table 't2'", addErr.Error())
}

func TestRenameTablesWithForeignKey(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomainWithSchemaLease(t, testLease)
tk := testkit.NewTestKit(t, store)
Expand Down

0 comments on commit 7b3c503

Please sign in to comment.