diff --git a/ddl/fktest/foreign_key_test.go b/ddl/fktest/foreign_key_test.go index 489e125dc8a3c..90b94a4e75dbb 100644 --- a/ddl/fktest/foreign_key_test.go +++ b/ddl/fktest/foreign_key_test.go @@ -427,7 +427,7 @@ func TestRenameTableWithForeignKeyMetaInfo(t *testing.T) { // check the schema diff diff = getLatestSchemaDiff(t, tk) require.Equal(t, model.ActionRenameTable, diff.Type) - require.Equal(t, 1, len(diff.AffectedOpts)) + require.Equal(t, 0, len(diff.AffectedOpts)) require.Equal(t, model.ReferredFKInfo{ Cols: []model.CIStr{model.NewCIStr("id")}, ChildSchema: model.NewCIStr("test2"), diff --git a/ddl/table.go b/ddl/table.go index 3e317032a86b1..744556ac8c5bd 100644 --- a/ddl/table.go +++ b/ddl/table.go @@ -990,6 +990,9 @@ func onRenameTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error) return ver, errors.Trace(err) } + if job.SchemaState == model.StatePublic { + return finishJobRenameTable(d, t, job) + } newSchemaID := job.SchemaID err := checkTableNotExists(d, t, newSchemaID, tableName.L) if err != nil { @@ -1017,7 +1020,7 @@ func onRenameTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error) if err != nil { return ver, errors.Trace(err) } - job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo) + job.SchemaState = model.StatePublic return ver, nil } @@ -1033,6 +1036,10 @@ func onRenameTables(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error return ver, errors.Trace(err) } + if job.SchemaState == model.StatePublic { + return finishJobRenameTables(d, t, job, tableNames, tableIDs, newSchemaIDs) + } + var tblInfos = make([]*model.TableInfo, 0, len(tableNames)) var err error fkh := newForeignKeyHelper() @@ -1058,7 +1065,7 @@ func onRenameTables(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error if err != nil { return ver, errors.Trace(err) } - job.FinishMultipleTableJob(model.JobStateDone, model.StatePublic, ver, tblInfos) + job.SchemaState = model.StatePublic return ver, nil } @@ -1155,6 +1162,54 @@ func adjustForeignKeyChildTableInfoAfterRenameTable(d *ddlCtx, t *meta.Meta, job return nil } +// We split the renaming table job into two steps: +// 1. rename table and update the schema version. +// 2. update the job state to JobStateDone. +// This is the requirement from TiCDC because +// - it uses the job state to check whether the DDL is finished. +// - there is a gap between schema reloading and job state updating: +// when the job state is updated to JobStateDone, before the new schema reloaded, +// there may be DMLs that use the old schema. +// - TiCDC cannot handle the DMLs that use the old schema, because +// the commit TS of the DMLs are greater than the job state updating TS. +func finishJobRenameTable(d *ddlCtx, t *meta.Meta, job *model.Job) (int64, error) { + tblInfo, err := getTableInfo(t, job.TableID, job.SchemaID) + if err != nil { + job.State = model.JobStateCancelled + return 0, errors.Trace(err) + } + ver, err := updateSchemaVersion(d, t, job) + if err != nil { + return ver, errors.Trace(err) + } + job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo) + return ver, nil +} + +func finishJobRenameTables(d *ddlCtx, t *meta.Meta, job *model.Job, + tableNames []*model.CIStr, tableIDs, newSchemaIDs []int64) (int64, error) { + tblSchemaIDs := make(map[int64]int64, len(tableIDs)) + for i := range tableIDs { + tblSchemaIDs[tableIDs[i]] = newSchemaIDs[i] + } + tblInfos := make([]*model.TableInfo, 0, len(tableNames)) + for i := range tableIDs { + tblID := tableIDs[i] + tblInfo, err := getTableInfo(t, tblID, tblSchemaIDs[tblID]) + if err != nil { + job.State = model.JobStateCancelled + return 0, errors.Trace(err) + } + tblInfos = append(tblInfos, tblInfo) + } + ver, err := updateSchemaVersion(d, t, job) + if err != nil { + return ver, errors.Trace(err) + } + job.FinishMultipleTableJob(model.JobStateDone, model.StatePublic, ver, tblInfos) + return ver, nil +} + func onModifyTableComment(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error) { var comment string if err := job.DecodeArgs(&comment); err != nil { diff --git a/ddl/table_test.go b/ddl/table_test.go index cf7fdbf1cd7af..8ffa2cb502db7 100644 --- a/ddl/table_test.go +++ b/ddl/table_test.go @@ -21,6 +21,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/tidb/ddl" + "github.com/pingcap/tidb/ddl/internal/callback" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/meta" "github.com/pingcap/tidb/meta/autoid" @@ -32,6 +33,7 @@ import ( "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/testkit" "github.com/pingcap/tidb/types" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -525,3 +527,52 @@ func TestAlterTTL(t *testing.T) { require.NoError(t, err) require.Empty(t, historyJob.BinlogInfo.TableInfo.TTLInfo) } + +func TestRenameTableIntermediateState(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + tk2 := testkit.NewTestKit(t, store) + originHook := dom.DDL().GetHook() + tk.MustExec("create database db1;") + tk.MustExec("create database db2;") + tk.MustExec("create table db1.t(a int);") + + testCases := []struct { + renameSQL string + insertSQL string + errMsg string + finalDB string + }{ + {"rename table db1.t to db1.t1;", "insert into db1.t values(1);", "[schema:1146]Table 'db1.t' doesn't exist", "db1.t1"}, + {"rename table db1.t1 to db1.t;", "insert into db1.t values(1);", "", "db1.t"}, + {"rename table db1.t to db2.t;", "insert into db1.t values(1);", "[schema:1146]Table 'db1.t' doesn't exist", "db2.t"}, + {"rename table db2.t to db1.t;", "insert into db1.t values(1);", "", "db1.t"}, + } + + for _, tc := range testCases { + hook := &callback.TestDDLCallback{Do: dom} + runInsert := false + fn := func(job *model.Job) { + if job.SchemaState == model.StatePublic && !runInsert && !t.Failed() { + _, err := tk2.Exec(tc.insertSQL) + if len(tc.errMsg) > 0 { + assert.Equal(t, tc.errMsg, err.Error()) + } else { + assert.NoError(t, err) + } + runInsert = true + } + } + hook.OnJobUpdatedExported.Store(&fn) + dom.DDL().SetHook(hook) + tk.MustExec(tc.renameSQL) + result := tk.MustQuery(fmt.Sprintf("select * from %s;", tc.finalDB)) + if len(tc.errMsg) > 0 { + result.Check(testkit.Rows()) + } else { + result.Check(testkit.Rows("1")) + } + tk.MustExec(fmt.Sprintf("delete from %s;", tc.finalDB)) + } + dom.DDL().SetHook(originHook) +} diff --git a/parser/model/ddl.go b/parser/model/ddl.go index d6d8790962382..f7b9ab0a666db 100644 --- a/parser/model/ddl.go +++ b/parser/model/ddl.go @@ -806,7 +806,7 @@ func (job *Job) IsRollbackable() bool { ActionDropForeignKey, ActionDropTablePartition: return job.SchemaState == StatePublic case ActionRebaseAutoID, ActionShardRowID, - ActionTruncateTable, ActionAddForeignKey, ActionRenameTable, + ActionTruncateTable, ActionAddForeignKey, ActionRenameTable, ActionRenameTables, ActionModifyTableCharsetAndCollate, ActionTruncateTablePartition, ActionModifySchemaCharsetAndCollate, ActionRepairTable, ActionModifyTableAutoIdCache, ActionModifySchemaDefaultPlacement: