Skip to content

Commit

Permalink
ddl: wait schema change before rename table job is done (#43341)
Browse files Browse the repository at this point in the history
close #43338
  • Loading branch information
tangenta authored Apr 27, 2023
1 parent 822767b commit 2e951bc
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 4 deletions.
2 changes: 1 addition & 1 deletion ddl/fktest/foreign_key_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,7 @@ func TestRenameTableWithForeignKeyMetaInfo(t *testing.T) {
// check the schema diff
diff = getLatestSchemaDiff(t, tk)
require.Equal(t, model.ActionRenameTable, diff.Type)
require.Equal(t, 1, len(diff.AffectedOpts))
require.Equal(t, 0, len(diff.AffectedOpts))
require.Equal(t, model.ReferredFKInfo{
Cols: []model.CIStr{model.NewCIStr("id")},
ChildSchema: model.NewCIStr("test2"),
Expand Down
59 changes: 57 additions & 2 deletions ddl/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -990,6 +990,9 @@ func onRenameTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error)
return ver, errors.Trace(err)
}

if job.SchemaState == model.StatePublic {
return finishJobRenameTable(d, t, job)
}
newSchemaID := job.SchemaID
err := checkTableNotExists(d, t, newSchemaID, tableName.L)
if err != nil {
Expand Down Expand Up @@ -1017,7 +1020,7 @@ func onRenameTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error)
if err != nil {
return ver, errors.Trace(err)
}
job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo)
job.SchemaState = model.StatePublic
return ver, nil
}

Expand All @@ -1033,6 +1036,10 @@ func onRenameTables(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error
return ver, errors.Trace(err)
}

if job.SchemaState == model.StatePublic {
return finishJobRenameTables(d, t, job, tableNames, tableIDs, newSchemaIDs)
}

var tblInfos = make([]*model.TableInfo, 0, len(tableNames))
var err error
fkh := newForeignKeyHelper()
Expand All @@ -1058,7 +1065,7 @@ func onRenameTables(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error
if err != nil {
return ver, errors.Trace(err)
}
job.FinishMultipleTableJob(model.JobStateDone, model.StatePublic, ver, tblInfos)
job.SchemaState = model.StatePublic
return ver, nil
}

Expand Down Expand Up @@ -1155,6 +1162,54 @@ func adjustForeignKeyChildTableInfoAfterRenameTable(d *ddlCtx, t *meta.Meta, job
return nil
}

// We split the renaming table job into two steps:
// 1. rename table and update the schema version.
// 2. update the job state to JobStateDone.
// This is the requirement from TiCDC because
// - it uses the job state to check whether the DDL is finished.
// - there is a gap between schema reloading and job state updating:
// when the job state is updated to JobStateDone, before the new schema reloaded,
// there may be DMLs that use the old schema.
// - TiCDC cannot handle the DMLs that use the old schema, because
// the commit TS of the DMLs are greater than the job state updating TS.
func finishJobRenameTable(d *ddlCtx, t *meta.Meta, job *model.Job) (int64, error) {
tblInfo, err := getTableInfo(t, job.TableID, job.SchemaID)
if err != nil {
job.State = model.JobStateCancelled
return 0, errors.Trace(err)
}
ver, err := updateSchemaVersion(d, t, job)
if err != nil {
return ver, errors.Trace(err)
}
job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo)
return ver, nil
}

func finishJobRenameTables(d *ddlCtx, t *meta.Meta, job *model.Job,
tableNames []*model.CIStr, tableIDs, newSchemaIDs []int64) (int64, error) {
tblSchemaIDs := make(map[int64]int64, len(tableIDs))
for i := range tableIDs {
tblSchemaIDs[tableIDs[i]] = newSchemaIDs[i]
}
tblInfos := make([]*model.TableInfo, 0, len(tableNames))
for i := range tableIDs {
tblID := tableIDs[i]
tblInfo, err := getTableInfo(t, tblID, tblSchemaIDs[tblID])
if err != nil {
job.State = model.JobStateCancelled
return 0, errors.Trace(err)
}
tblInfos = append(tblInfos, tblInfo)
}
ver, err := updateSchemaVersion(d, t, job)
if err != nil {
return ver, errors.Trace(err)
}
job.FinishMultipleTableJob(model.JobStateDone, model.StatePublic, ver, tblInfos)
return ver, nil
}

func onModifyTableComment(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error) {
var comment string
if err := job.DecodeArgs(&comment); err != nil {
Expand Down
51 changes: 51 additions & 0 deletions ddl/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/ddl/internal/callback"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/meta/autoid"
Expand All @@ -32,6 +33,7 @@ import (
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/testkit"
"github.com/pingcap/tidb/types"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -525,3 +527,52 @@ func TestAlterTTL(t *testing.T) {
require.NoError(t, err)
require.Empty(t, historyJob.BinlogInfo.TableInfo.TTLInfo)
}

func TestRenameTableIntermediateState(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
tk2 := testkit.NewTestKit(t, store)
originHook := dom.DDL().GetHook()
tk.MustExec("create database db1;")
tk.MustExec("create database db2;")
tk.MustExec("create table db1.t(a int);")

testCases := []struct {
renameSQL string
insertSQL string
errMsg string
finalDB string
}{
{"rename table db1.t to db1.t1;", "insert into db1.t values(1);", "[schema:1146]Table 'db1.t' doesn't exist", "db1.t1"},
{"rename table db1.t1 to db1.t;", "insert into db1.t values(1);", "", "db1.t"},
{"rename table db1.t to db2.t;", "insert into db1.t values(1);", "[schema:1146]Table 'db1.t' doesn't exist", "db2.t"},
{"rename table db2.t to db1.t;", "insert into db1.t values(1);", "", "db1.t"},
}

for _, tc := range testCases {
hook := &callback.TestDDLCallback{Do: dom}
runInsert := false
fn := func(job *model.Job) {
if job.SchemaState == model.StatePublic && !runInsert && !t.Failed() {
_, err := tk2.Exec(tc.insertSQL)
if len(tc.errMsg) > 0 {
assert.Equal(t, tc.errMsg, err.Error())
} else {
assert.NoError(t, err)
}
runInsert = true
}
}
hook.OnJobUpdatedExported.Store(&fn)
dom.DDL().SetHook(hook)
tk.MustExec(tc.renameSQL)
result := tk.MustQuery(fmt.Sprintf("select * from %s;", tc.finalDB))
if len(tc.errMsg) > 0 {
result.Check(testkit.Rows())
} else {
result.Check(testkit.Rows("1"))
}
tk.MustExec(fmt.Sprintf("delete from %s;", tc.finalDB))
}
dom.DDL().SetHook(originHook)
}
2 changes: 1 addition & 1 deletion parser/model/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -806,7 +806,7 @@ func (job *Job) IsRollbackable() bool {
ActionDropForeignKey, ActionDropTablePartition:
return job.SchemaState == StatePublic
case ActionRebaseAutoID, ActionShardRowID,
ActionTruncateTable, ActionAddForeignKey, ActionRenameTable,
ActionTruncateTable, ActionAddForeignKey, ActionRenameTable, ActionRenameTables,
ActionModifyTableCharsetAndCollate, ActionTruncateTablePartition,
ActionModifySchemaCharsetAndCollate, ActionRepairTable,
ActionModifyTableAutoIdCache, ActionModifySchemaDefaultPlacement:
Expand Down

0 comments on commit 2e951bc

Please sign in to comment.