Skip to content

Commit

Permalink
ddl: rename column should update foreign key constrain (#37870)
Browse files Browse the repository at this point in the history
close #37868
  • Loading branch information
crazycs520 authored Sep 20, 2022
1 parent df82263 commit 2c35087
Show file tree
Hide file tree
Showing 8 changed files with 225 additions and 24 deletions.
62 changes: 61 additions & 1 deletion ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/pingcap/tidb/parser/terror"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
Expand Down Expand Up @@ -917,6 +918,19 @@ func updateNewIdxColsNameOffset(changingIdxs []*model.IndexInfo,
}
}

func updateFKInfoWhenModifyColumn(tblInfo *model.TableInfo, oldCol, newCol model.CIStr) {
if oldCol.L == newCol.L {
return
}
for _, fk := range tblInfo.ForeignKeys {
for i := range fk.Cols {
if fk.Cols[i].L == oldCol.L {
fk.Cols[i] = newCol
}
}
}
}

// filterIndexesToRemove filters out the indexes that can be removed.
func filterIndexesToRemove(changingIdxs []*model.IndexInfo, colName model.CIStr, tblInfo *model.TableInfo) []*model.IndexInfo {
indexesToRemove := make([]*model.IndexInfo, 0, len(changingIdxs))
Expand Down Expand Up @@ -1401,7 +1415,11 @@ func (w *worker) doModifyColumn(
return ver, errors.Trace(err)
}

ver, err := updateVersionAndTableInfoWithCheck(d, t, job, tblInfo, true)
childTableInfos, err := adjustForeignKeyChildTableInfoAfterModifyColumn(d, t, job, tblInfo, newCol, oldCol)
if err != nil {
return ver, errors.Trace(err)
}
ver, err = updateVersionAndTableInfoWithCheck(d, t, job, tblInfo, true, childTableInfos...)
if err != nil {
// Modified the type definition of 'null' to 'not null' before this, so rollBack the job when an error occurs.
job.State = model.JobStateRollingback
Expand Down Expand Up @@ -1430,9 +1448,51 @@ func adjustTableInfoAfterModifyColumn(
tblInfo.Columns[oldCol.Offset] = newCol
tblInfo.MoveColumnInfo(oldCol.Offset, destOffset)
updateNewIdxColsNameOffset(tblInfo.Indices, oldCol.Name, newCol)
updateFKInfoWhenModifyColumn(tblInfo, oldCol.Name, newCol.Name)
return nil
}

func adjustForeignKeyChildTableInfoAfterModifyColumn(d *ddlCtx, t *meta.Meta, job *model.Job, tblInfo *model.TableInfo, newCol, oldCol *model.ColumnInfo) ([]schemaIDAndTableInfo, error) {
if !variable.EnableForeignKey.Load() || newCol.Name.L == oldCol.Name.L {
return nil, nil
}
is, err := getAndCheckLatestInfoSchema(d, t)
if err != nil {
return nil, err
}
referredFKs := is.GetTableReferredForeignKeys(job.SchemaName, tblInfo.Name.L)
if len(referredFKs) == 0 {
return nil, nil
}
fkh := newForeignKeyHelper(job.SchemaName, job.SchemaID, tblInfo)
for _, referredFK := range referredFKs {
info, err := fkh.getTableFromStorage(is, t, referredFK.ChildSchema, referredFK.ChildTable)
if err != nil {
if infoschema.ErrTableNotExists.Equal(err) || infoschema.ErrDatabaseNotExists.Equal(err) {
continue
}
return nil, err
}
fkInfo := model.FindFKInfoByName(info.tblInfo.ForeignKeys, referredFK.ChildFKName.L)
if fkInfo == nil {
continue
}
for i := range fkInfo.RefCols {
if fkInfo.RefCols[i].L == oldCol.Name.L {
fkInfo.RefCols[i] = newCol.Name
}
}
}
infoList := make([]schemaIDAndTableInfo, 0, len(fkh.loaded))
for _, info := range fkh.loaded {
if info.tblInfo.ID == tblInfo.ID {
continue
}
infoList = append(infoList, info)
}
return infoList, nil
}

func checkAndApplyAutoRandomBits(d *ddlCtx, m *meta.Meta, dbInfo *model.DBInfo, tblInfo *model.TableInfo,
oldCol *model.ColumnInfo, newCol *model.ColumnInfo, newAutoRandBits uint64) error {
if newAutoRandBits == 0 {
Expand Down
9 changes: 0 additions & 9 deletions ddl/column_modify_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,15 +404,6 @@ func TestRenameColumn(t *testing.T) {
// Test renaming to an exist column.
tk.MustGetErrCode("alter table test_rename_column rename column col2 to id", errno.ErrDupFieldName)

// Test renaming the column with foreign key.
tk.MustExec("drop table test_rename_column")
tk.MustExec("create table test_rename_column_base (base int)")
tk.MustExec("create table test_rename_column (col int, foreign key (col) references test_rename_column_base(base))")

tk.MustGetErrCode("alter table test_rename_column rename column col to col1", errno.ErrFKIncompatibleColumns)

tk.MustExec("drop table test_rename_column_base")

// Test renaming generated columns.
tk.MustExec("drop table test_rename_column")
tk.MustExec("create table test_rename_column (id int, col1 int generated always as (id + 1))")
Expand Down
4 changes: 0 additions & 4 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -4809,10 +4809,6 @@ func (d *ddl) RenameColumn(ctx sessionctx.Context, ident ast.Ident, spec *ast.Al
return infoschema.ErrColumnExists.GenWithStackByArgs(newColName)
}

if fkInfo := GetColumnForeignKeyInfo(oldColName.L, tbl.Meta().ForeignKeys); fkInfo != nil {
return dbterror.ErrFKIncompatibleColumns.GenWithStackByArgs(oldColName, fkInfo.Name)
}

// Check generated expression.
for _, col := range allCols {
if col.GeneratedExpr == nil {
Expand Down
10 changes: 9 additions & 1 deletion ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -1400,7 +1400,7 @@ func buildPlacementAffects(oldIDs []int64, newIDs []int64) []*model.AffectedOpti
}

// updateSchemaVersion increments the schema version by 1 and sets SchemaDiff.
func updateSchemaVersion(d *ddlCtx, t *meta.Meta, job *model.Job) (int64, error) {
func updateSchemaVersion(d *ddlCtx, t *meta.Meta, job *model.Job, multiInfos ...schemaIDAndTableInfo) (int64, error) {
schemaVersion, err := d.setSchemaVersion(job, d.store)
if err != nil {
return 0, errors.Trace(err)
Expand Down Expand Up @@ -1531,6 +1531,14 @@ func updateSchemaVersion(d *ddlCtx, t *meta.Meta, job *model.Job) (int64, error)
default:
diff.TableID = job.TableID
}
for _, info := range multiInfos {
diff.AffectedOpts = append(diff.AffectedOpts, &model.AffectedOption{
SchemaID: info.schemaID,
OldSchemaID: info.schemaID,
TableID: info.tblInfo.ID,
OldTableID: info.tblInfo.ID,
})
}
err = t.SetSchemaDiff(diff)
return schemaVersion, errors.Trace(err)
}
Expand Down
38 changes: 38 additions & 0 deletions ddl/foreign_key.go
Original file line number Diff line number Diff line change
Expand Up @@ -511,3 +511,41 @@ func checkDropColumnWithForeignKeyConstraintInOwner(d *ddlCtx, t *meta.Meta, job
}
return nil
}

type foreignKeyHelper struct {
loaded map[schemaAndTable]schemaIDAndTableInfo
}

type schemaAndTable struct {
schema string
table string
}

func newForeignKeyHelper(schema string, schemaID int64, tblInfo *model.TableInfo) foreignKeyHelper {
h := foreignKeyHelper{loaded: make(map[schemaAndTable]schemaIDAndTableInfo)}
k := schemaAndTable{schema: schema, table: tblInfo.Name.L}
h.loaded[k] = schemaIDAndTableInfo{schemaID: schemaID, tblInfo: tblInfo}
return h
}

func (h *foreignKeyHelper) getTableFromStorage(is infoschema.InfoSchema, t *meta.Meta, schema, table model.CIStr) (result schemaIDAndTableInfo, _ error) {
k := schemaAndTable{schema: schema.L, table: table.L}
if info, ok := h.loaded[k]; ok {
return info, nil
}
db, ok := is.SchemaByName(schema)
if !ok {
return result, errors.Trace(infoschema.ErrDatabaseNotExists.GenWithStackByArgs(schema))
}
tb, err := is.TableByName(schema, table)
if err != nil {
return result, errors.Trace(err)
}
tbInfo, err := getTableInfo(t, tb.Meta().ID, db.ID)
if err != nil {
return result, errors.Trace(err)
}
result.schemaID, result.tblInfo = db.ID, tbInfo
h.loaded[k] = result
return result, nil
}
87 changes: 87 additions & 0 deletions ddl/foreign_key_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1169,3 +1169,90 @@ func TestDropColumnWithForeignKey(t *testing.T) {
tk.MustGetErrMsg("alter table t1 drop column b;", "[ddl:1829]Cannot drop column 'b': needed in a foreign key constraint 'fk' of table 't2'")
tk.MustGetErrMsg("alter table t2 drop column a;", "[ddl:1828]Cannot drop column 'a': needed in a foreign key constraint 'fk'")
}

func TestRenameColumnWithForeignKeyMetaInfo(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("set @@global.tidb_enable_foreign_key=1")
tk.MustExec("set @@foreign_key_checks=1;")
tk.MustExec("use test")

tk.MustExec("create table t1 (id int key, a int, b int, foreign key fk(a) references t1(id))")
tk.MustExec("alter table t1 change id kid int")
tk.MustExec("alter table t1 rename column a to aa")
tbl1Info := getTableInfo(t, dom, "test", "t1")
tb1ReferredFKs := getTableInfoReferredForeignKeys(t, dom, "test", "t1")
require.Equal(t, 1, len(tbl1Info.ForeignKeys))
require.Equal(t, 1, len(tb1ReferredFKs))
require.Equal(t, "kid", tb1ReferredFKs[0].Cols[0].L)
require.Equal(t, "kid", tbl1Info.ForeignKeys[0].RefCols[0].L)
require.Equal(t, "aa", tbl1Info.ForeignKeys[0].Cols[0].L)

tk.MustExec("drop table t1")
tk.MustExec("create table t1 (id int key, b int, index(b))")
tk.MustExec("create table t2 (a int, b int, foreign key fk(a) references t1(b));")
tk.MustExec("alter table t2 change a aa int")
tbl1Info = getTableInfo(t, dom, "test", "t1")
tb1ReferredFKs = getTableInfoReferredForeignKeys(t, dom, "test", "t1")
require.Equal(t, 1, len(tb1ReferredFKs))
require.Equal(t, 1, len(tb1ReferredFKs[0].Cols))
require.Equal(t, "b", tb1ReferredFKs[0].Cols[0].L)
tbl2Info := getTableInfo(t, dom, "test", "t2")
tb2ReferredFKs := getTableInfoReferredForeignKeys(t, dom, "test", "t2")
require.Equal(t, 0, len(tb2ReferredFKs))
require.Equal(t, 1, len(tbl2Info.ForeignKeys))
require.Equal(t, 1, len(tbl2Info.ForeignKeys[0].Cols))
require.Equal(t, 1, len(tbl2Info.ForeignKeys[0].RefCols))
require.Equal(t, "aa", tbl2Info.ForeignKeys[0].Cols[0].L)
require.Equal(t, "b", tbl2Info.ForeignKeys[0].RefCols[0].L)

tk.MustExec("alter table t1 change id kid int")
tk.MustExec("alter table t1 change b bb int")
tbl1Info = getTableInfo(t, dom, "test", "t1")
tb1ReferredFKs = getTableInfoReferredForeignKeys(t, dom, "test", "t1")
require.Equal(t, 1, len(tb1ReferredFKs))
require.Equal(t, 1, len(tb1ReferredFKs[0].Cols))
require.Equal(t, "bb", tb1ReferredFKs[0].Cols[0].L)
tbl2Info = getTableInfo(t, dom, "test", "t2")
tb2ReferredFKs = getTableInfoReferredForeignKeys(t, dom, "test", "t2")
require.Equal(t, 0, len(tb2ReferredFKs))
require.Equal(t, 1, len(tbl2Info.ForeignKeys))
require.Equal(t, 1, len(tbl2Info.ForeignKeys[0].Cols))
require.Equal(t, 1, len(tbl2Info.ForeignKeys[0].RefCols))
require.Equal(t, "aa", tbl2Info.ForeignKeys[0].Cols[0].L)
require.Equal(t, "bb", tbl2Info.ForeignKeys[0].RefCols[0].L)

tk.MustExec("drop table t1, t2")
tk.MustExec("create table t1 (id int key, b int, index(b))")
tk.MustExec("create table t2 (a int, b int, foreign key (a) references t1(b), foreign key (b) references t1(b));")
tk.MustExec("alter table t1 change b bb int")
tbl1Info = getTableInfo(t, dom, "test", "t1")
tb1ReferredFKs = getTableInfoReferredForeignKeys(t, dom, "test", "t1")
require.Equal(t, 2, len(tb1ReferredFKs))
require.Equal(t, 1, len(tb1ReferredFKs[0].Cols))
require.Equal(t, 1, len(tb1ReferredFKs[1].Cols))
require.Equal(t, "bb", tb1ReferredFKs[0].Cols[0].L)
require.Equal(t, "bb", tb1ReferredFKs[1].Cols[0].L)
tbl2Info = getTableInfo(t, dom, "test", "t2")
tb2ReferredFKs = getTableInfoReferredForeignKeys(t, dom, "test", "t2")
require.Equal(t, 0, len(tb2ReferredFKs))
require.Equal(t, 2, len(tbl2Info.ForeignKeys))
require.Equal(t, 1, len(tbl2Info.ForeignKeys[0].Cols))
require.Equal(t, 1, len(tbl2Info.ForeignKeys[0].RefCols))
require.Equal(t, "a", tbl2Info.ForeignKeys[0].Cols[0].L)
require.Equal(t, "bb", tbl2Info.ForeignKeys[0].RefCols[0].L)
require.Equal(t, 1, len(tbl2Info.ForeignKeys[1].Cols))
require.Equal(t, 1, len(tbl2Info.ForeignKeys[1].RefCols))
require.Equal(t, "b", tbl2Info.ForeignKeys[1].Cols[0].L)
require.Equal(t, "bb", tbl2Info.ForeignKeys[1].RefCols[0].L)
tk.MustExec("alter table t2 rename column a to aa")
tk.MustExec("alter table t2 change b bb int")
tk.MustQuery("show create table t2").
Check(testkit.Rows("t2 CREATE TABLE `t2` (\n" +
" `aa` int(11) DEFAULT NULL,\n" +
" `bb` int(11) DEFAULT NULL,\n" +
" KEY `fk_1` (`aa`),\n KEY `fk_2` (`bb`),\n" +
" CONSTRAINT `fk_1` FOREIGN KEY (`aa`) REFERENCES `t1` (`bb`),\n" +
" CONSTRAINT `fk_2` FOREIGN KEY (`bb`) REFERENCES `t1` (`bb`)\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin"))
}
4 changes: 0 additions & 4 deletions ddl/schematracker/dm_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -567,10 +567,6 @@ func (d SchemaTracker) renameColumn(ctx sessionctx.Context, ident ast.Ident, spe
return infoschema.ErrColumnExists.GenWithStackByArgs(newColName)
}

if fkInfo := ddl.GetColumnForeignKeyInfo(oldColName.L, tbl.Meta().ForeignKeys); fkInfo != nil {
return dbterror.ErrFKIncompatibleColumns.GenWithStackByArgs(oldColName, fkInfo.Name)
}

// Check generated expression.
for _, col := range allCols {
if col.GeneratedExpr == nil {
Expand Down
35 changes: 30 additions & 5 deletions ddl/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -1347,18 +1347,25 @@ func checkTableNotExistsFromStore(t *meta.Meta, schemaID int64, tableName string
}

// updateVersionAndTableInfoWithCheck checks table info validate and updates the schema version and the table information
func updateVersionAndTableInfoWithCheck(d *ddlCtx, t *meta.Meta, job *model.Job, tblInfo *model.TableInfo, shouldUpdateVer bool) (
func updateVersionAndTableInfoWithCheck(d *ddlCtx, t *meta.Meta, job *model.Job, tblInfo *model.TableInfo, shouldUpdateVer bool, multiInfos ...schemaIDAndTableInfo) (
ver int64, err error) {
err = checkTableInfoValid(tblInfo)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
return updateVersionAndTableInfo(d, t, job, tblInfo, shouldUpdateVer)
for _, info := range multiInfos {
err = checkTableInfoValid(info.tblInfo)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
}
return updateVersionAndTableInfo(d, t, job, tblInfo, shouldUpdateVer, multiInfos...)
}

// updateVersionAndTableInfo updates the schema version and the table information.
func updateVersionAndTableInfo(d *ddlCtx, t *meta.Meta, job *model.Job, tblInfo *model.TableInfo, shouldUpdateVer bool) (
func updateVersionAndTableInfo(d *ddlCtx, t *meta.Meta, job *model.Job, tblInfo *model.TableInfo, shouldUpdateVer bool, multiInfos ...schemaIDAndTableInfo) (
ver int64, err error) {
failpoint.Inject("mockUpdateVersionAndTableInfoErr", func(val failpoint.Value) {
switch val.(int) {
Expand All @@ -1373,7 +1380,7 @@ func updateVersionAndTableInfo(d *ddlCtx, t *meta.Meta, job *model.Job, tblInfo
}
})
if shouldUpdateVer && (job.MultiSchemaInfo == nil || !job.MultiSchemaInfo.SkipVersion) {
ver, err = updateSchemaVersion(d, t, job)
ver, err = updateSchemaVersion(d, t, job, multiInfos...)
if err != nil {
return 0, errors.Trace(err)
}
Expand All @@ -1382,7 +1389,25 @@ func updateVersionAndTableInfo(d *ddlCtx, t *meta.Meta, job *model.Job, tblInfo
if tblInfo.State == model.StatePublic {
tblInfo.UpdateTS = t.StartTS
}
return ver, t.UpdateTable(job.SchemaID, tblInfo)
err = t.UpdateTable(job.SchemaID, tblInfo)
if err != nil {
return 0, errors.Trace(err)
}
for _, info := range multiInfos {
if info.tblInfo.State == model.StatePublic {
info.tblInfo.UpdateTS = t.StartTS
}
err = t.UpdateTable(info.schemaID, info.tblInfo)
if err != nil {
return 0, errors.Trace(err)
}
}
return ver, nil
}

type schemaIDAndTableInfo struct {
schemaID int64
tblInfo *model.TableInfo
}

func onRepairTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error) {
Expand Down

0 comments on commit 2c35087

Please sign in to comment.