Skip to content

Commit

Permalink
ddl: fix issue of add foreign key failed but doesn't rollback (#39583)
Browse files Browse the repository at this point in the history
close #39582
  • Loading branch information
crazycs520 authored Dec 2, 2022
1 parent 34c2a78 commit 0470fa3
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 27 deletions.
49 changes: 22 additions & 27 deletions ddl/foreign_key.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ func (w *worker) onCreateForeignKey(d *ddlCtx, t *meta.Meta, job *model.Job) (ve
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
if job.IsRollingback() {
return dropForeignKey(d, t, job, tblInfo, fkInfo.Name)
}
switch job.SchemaState {
case model.StateNone:
err = checkAddForeignKeyValidInOwner(d, t, job.SchemaName, tblInfo, &fkInfo, fkCheck)
Expand All @@ -63,7 +66,7 @@ func (w *worker) onCreateForeignKey(d *ddlCtx, t *meta.Meta, job *model.Job) (ve
case model.StateWriteOnly:
err = checkForeignKeyConstrain(w, job.SchemaName, tblInfo.Name.L, &fkInfo, fkCheck)
if err != nil {
job.State = model.JobStateCancelled
job.State = model.JobStateRollingback
return ver, err
}
tblInfo.ForeignKeys[len(tblInfo.ForeignKeys)-1].State = model.StateWriteReorganization
Expand Down Expand Up @@ -94,54 +97,46 @@ func onDropForeignKey(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ err
return ver, errors.Trace(err)
}

var (
fkName model.CIStr
found bool
fkInfo model.FKInfo
)
var fkName model.CIStr
err = job.DecodeArgs(&fkName)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
return dropForeignKey(d, t, job, tblInfo, fkName)
}

func dropForeignKey(d *ddlCtx, t *meta.Meta, job *model.Job, tblInfo *model.TableInfo, fkName model.CIStr) (ver int64, err error) {
var fkInfo *model.FKInfo
for _, fk := range tblInfo.ForeignKeys {
if fk.Name.L == fkName.L {
found = true
fkInfo = *fk
fkInfo = fk
break
}
}

if !found {
if fkInfo == nil {
job.State = model.JobStateCancelled
return ver, infoschema.ErrForeignKeyNotExists.GenWithStackByArgs(fkName)
}

nfks := tblInfo.ForeignKeys[:0]
for _, fk := range tblInfo.ForeignKeys {
if fk.Name.L != fkName.L {
nfks = append(nfks, fk)
}
}
tblInfo.ForeignKeys = nfks

originalState := fkInfo.State
switch fkInfo.State {
case model.StatePublic:
// We just support record the foreign key, so we just make it none.
// public -> none
fkInfo.State = model.StateNone
ver, err = updateVersionAndTableInfo(d, t, job, tblInfo, originalState != fkInfo.State)
if err != nil {
return ver, errors.Trace(err)
}
// Finish this job.
ver, err = updateVersionAndTableInfo(d, t, job, tblInfo, true)
if err != nil {
return ver, errors.Trace(err)
}
// Finish this job.
if job.IsRollingback() {
job.FinishTableJob(model.JobStateRollbackDone, model.StateNone, ver, tblInfo)
} else {
job.FinishTableJob(model.JobStateDone, model.StateNone, ver, tblInfo)
job.SchemaState = fkInfo.State
return ver, nil
default:
return ver, dbterror.ErrInvalidDDLState.GenWithStackByArgs("foreign key", fkInfo.State)
}
job.SchemaState = model.StateNone
return ver, err
}

func allocateFKIndexID(tblInfo *model.TableInfo) int64 {
Expand Down
36 changes: 36 additions & 0 deletions ddl/foreign_key_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1513,6 +1513,42 @@ func TestAddForeignKey(t *testing.T) {
tbl2Info = getTableInfo(t, dom, "test", "t2")
require.Equal(t, 0, len(tbl2Info.ForeignKeys))
tk.MustGetDBError("alter table t2 drop index idx_c, add constraint fk_c foreign key (c) references t1(b)", dbterror.ErrDropIndexNeededInForeignKey)

// Test circular dependency add foreign key failed.
tk.MustExec("drop table if exists t1,t2")
tk.MustExec("create table t1 (id int key,a int, index(a));")
tk.MustExec("create table t2 (id int key,a int, foreign key fk(a) references t1(id) ON DELETE CASCADE);")
tk.MustExec("insert into t1 values (1,1);")
err := tk.ExecToErr("ALTER TABLE t1 ADD foreign key fk(a) references t2(id) ON DELETE CASCADE;")
require.Error(t, err)
require.Equal(t, "[ddl:1452]Cannot add or update a child row: a foreign key constraint fails (`test`.`t1`, CONSTRAINT `fk` FOREIGN KEY (`a`) REFERENCES `t2` (`id`) ON DELETE CASCADE)", err.Error())
tbl1Info := getTableInfo(t, dom, "test", "t1")
require.Equal(t, 0, len(tbl1Info.ForeignKeys))
referredFKs := dom.InfoSchema().GetTableReferredForeignKeys("test", "t2")
require.Equal(t, 0, len(referredFKs))
tk.MustQuery("show create table t1").Check(testkit.Rows("t1 CREATE TABLE `t1` (\n" +
" `id` int(11) NOT NULL,\n" +
" `a` int(11) DEFAULT NULL,\n" +
" PRIMARY KEY (`id`) /*T![clustered_index] CLUSTERED */,\n" +
" KEY `a` (`a`)\n) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin"))

// Test add foreign key with auto-create index failed.
tk.MustExec("drop table if exists t1,t2")
tk.MustExec("create table t1 (id int key,a int);")
tk.MustExec("create table t2 (id int key);")
tk.MustExec("insert into t1 values (1,1);")
err = tk.ExecToErr("ALTER TABLE t1 ADD foreign key fk(a) references t2(id) ON DELETE CASCADE;")
require.Error(t, err)
require.Equal(t, "[ddl:1452]Cannot add or update a child row: a foreign key constraint fails (`test`.`t1`, CONSTRAINT `fk` FOREIGN KEY (`a`) REFERENCES `t2` (`id`) ON DELETE CASCADE)", err.Error())
tbl1Info = getTableInfo(t, dom, "test", "t1")
require.Equal(t, 0, len(tbl1Info.ForeignKeys))
referredFKs = dom.InfoSchema().GetTableReferredForeignKeys("test", "t2")
require.Equal(t, 0, len(referredFKs))
tk.MustQuery("show create table t1").Check(testkit.Rows("t1 CREATE TABLE `t1` (\n" +
" `id` int(11) NOT NULL,\n" +
" `a` int(11) DEFAULT NULL,\n" +
" PRIMARY KEY (`id`) /*T![clustered_index] CLUSTERED */\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin"))
}

func TestAddForeignKey2(t *testing.T) {
Expand Down

0 comments on commit 0470fa3

Please sign in to comment.