From bd47071eae930e6b8957de0ae3ce2b678e8b4a60 Mon Sep 17 00:00:00 2001 From: tangenta Date: Mon, 14 Mar 2022 17:51:48 +0800 Subject: [PATCH] support multi-schema-change for drop index (#20) * support multi-schema-change for drop index * fix integration tests --- ddl/column.go | 11 +++--- ddl/column_modify_test.go | 5 ++- ddl/ddl.go | 6 ++- ddl/ddl_api.go | 6 ++- ddl/ddl_worker.go | 47 +++++++++++++++++------- ddl/index.go | 65 ++++++++++++++++++++++----------- ddl/multi_schema_change.go | 8 ++-- ddl/multi_schema_change_test.go | 44 ++++++++++++++++++++++ util/admin/admin.go | 5 +++ 9 files changed, 149 insertions(+), 48 deletions(-) diff --git a/ddl/column.go b/ddl/column.go index b3a85bd663137..855546c0f2105 100644 --- a/ddl/column.go +++ b/ddl/column.go @@ -217,6 +217,9 @@ func onAddColumn(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error) } func locateOffsetForColumn(pos *ast.ColumnPosition, tblInfo *model.TableInfo) (offset int, err error) { + if pos == nil { + return -1, nil + } // Get column offset. switch pos.Tp { case ast.ColumnPositionFirst: @@ -362,7 +365,7 @@ func onAddColumns(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error case model.StateWriteReorganization: // reorganization -> public // Adjust table column offsets. - for i, newCol := range tblInfo.Columns[:len(tblInfo.Columns)-len(positions)] { + for i, newCol := range tblInfo.Columns[len(tblInfo.Columns)-len(positions):] { offset, err := locateOffsetForColumn(positions[i], tblInfo) if err != nil { return ver, errors.Trace(err) @@ -540,10 +543,7 @@ func onDropColumn(t *meta.Meta, job *model.Job) (ver int64, _ error) { } if job.MultiSchemaInfo != nil && job.MultiSchemaInfo.Revertible { job.MarkNonRevertible() - ver, err = updateVersionAndTableInfoWithCheck(t, job, tblInfo, false) - if err != nil { - return ver, errors.Trace(err) - } + return updateVersionAndTableInfoWithCheck(t, job, tblInfo, false) } originalState := colInfo.State @@ -590,6 +590,7 @@ func onDropColumn(t *meta.Meta, job *model.Job) (ver int64, _ error) { case model.StateDeleteReorganization: // reorganization -> absent // All reorganization jobs are done, drop this column. + tblInfo.MoveColumnInfo(colInfo.Offset, len(tblInfo.Columns)-1) tblInfo.Columns = tblInfo.Columns[:len(tblInfo.Columns)-1] colInfo.State = model.StateNone ver, err = updateVersionAndTableInfo(t, job, tblInfo, originalState != colInfo.State) diff --git a/ddl/column_modify_test.go b/ddl/column_modify_test.go index eb0dfd9aaaeba..1eb8141272c4d 100644 --- a/ddl/column_modify_test.go +++ b/ddl/column_modify_test.go @@ -568,7 +568,10 @@ func TestCancelDropColumns(t *testing.T) { var jobID int64 testCase := &testCases[0] hook.OnJobRunBeforeExported = func(job *model.Job) { - if job.Type == model.ActionDropColumns && job.State == testCase.jobState && job.SchemaState == testCase.JobSchemaState { + isDropColTp := job.Type == model.ActionMultiSchemaChange && + job.MultiSchemaInfo.SubJobs[0].Type == model.ActionDropColumn + if isDropColTp && job.MultiSchemaInfo.SubJobs[0].State == testCase.jobState && + job.MultiSchemaInfo.SubJobs[0].SchemaState == testCase.JobSchemaState { jobIDs := []int64{job.ID} jobID = job.ID hookCtx := mock.NewContext() diff --git a/ddl/ddl.go b/ddl/ddl.go index c757256d28098..319c983f9ccae 100644 --- a/ddl/ddl.go +++ b/ddl/ddl.go @@ -709,14 +709,16 @@ func (d *ddl) doDDLJob(ctx sessionctx.Context, job *model.Job) error { return errors.Trace(historyJob.Error) } // Only for JobStateCancelled job which is adding columns or drop columns or drop indexes. - if historyJob.IsCancelled() && (historyJob.Type == model.ActionAddColumns || historyJob.Type == model.ActionDropColumns || historyJob.Type == model.ActionDropIndexes) { + if historyJob.IsCancelled() && (historyJob.Type == model.ActionAddColumns || historyJob.Type == model.ActionDropColumns || + historyJob.Type == model.ActionDropIndexes || + historyJob.Type == model.ActionMultiSchemaChange) { if historyJob.MultiSchemaInfo != nil && len(historyJob.MultiSchemaInfo.Warnings) != 0 { for _, warning := range historyJob.MultiSchemaInfo.Warnings { ctx.GetSessionVars().StmtCtx.AppendWarning(warning) } } logutil.BgLogger().Info("[ddl] DDL job is cancelled", zap.Int64("jobID", jobID)) - return nil + return errCancelledDDLJob } panic("When the state is JobStateRollbackDone or JobStateCancelled, historyJob.Error should never be nil") } diff --git a/ddl/ddl_api.go b/ddl/ddl_api.go index 533cb9f1357a9..65a6f242b0f2c 100644 --- a/ddl/ddl_api.go +++ b/ddl/ddl_api.go @@ -3949,13 +3949,17 @@ func (d *ddl) DropColumn(ctx sessionctx.Context, ti ast.Ident, spec *ast.AlterTa return err } + var multiSchemaInfo *model.MultiSchemaInfo + if ctx.GetSessionVars().EnableChangeMultiSchema { + multiSchemaInfo = &model.MultiSchemaInfo{} + } job := &model.Job{ SchemaID: schema.ID, TableID: t.Meta().ID, SchemaName: schema.Name.L, Type: model.ActionDropColumn, BinlogInfo: &model.HistoryInfo{}, - MultiSchemaInfo: nil, + MultiSchemaInfo: multiSchemaInfo, Args: []interface{}{colName}, } diff --git a/ddl/ddl_worker.go b/ddl/ddl_worker.go index 1057dd886a3ff..f87eb8df6551c 100644 --- a/ddl/ddl_worker.go +++ b/ddl/ddl_worker.go @@ -401,19 +401,9 @@ func (w *worker) finishDDLJob(t *meta.Meta, job *model.Job) (err error) { metrics.DDLWorkerHistogram.WithLabelValues(metrics.WorkerFinishDDLJob, job.Type.String(), metrics.RetLabel(err)).Observe(time.Since(startTime).Seconds()) }() - if !job.IsCancelled() { - switch job.Type { - case model.ActionAddIndex, model.ActionAddPrimaryKey: - if job.State != model.JobStateRollbackDone { - break - } - - // After rolling back an AddIndex operation, we need to use delete-range to delete the half-done index data. - err = w.deleteRange(w.ddlJobCtx, job) - case model.ActionDropSchema, model.ActionDropTable, model.ActionTruncateTable, model.ActionDropIndex, model.ActionDropPrimaryKey, - model.ActionDropTablePartition, model.ActionTruncateTablePartition, model.ActionDropColumn, model.ActionDropColumns, model.ActionModifyColumn, model.ActionDropIndexes: - err = w.deleteRange(w.ddlJobCtx, job) - } + err = deleteRangeForDropSchemaObjectJob(w, job) + if err != nil { + return errors.Trace(err) } switch job.Type { @@ -448,6 +438,33 @@ func (w *worker) finishDDLJob(t *meta.Meta, job *model.Job) (err error) { return errors.Trace(err) } +func deleteRangeForDropSchemaObjectJob(w *worker, job *model.Job) error { + if job.IsCancelled() { + return nil + } + switch job.Type { + case model.ActionAddIndex, model.ActionAddPrimaryKey: + if job.State != model.JobStateRollbackDone { + break + } + // After rolling back an AddIndex operation, we need to use delete-range to delete the half-done index data. + return w.deleteRange(w.ddlJobCtx, job) + case model.ActionDropSchema, model.ActionDropTable, model.ActionTruncateTable, model.ActionDropIndex, model.ActionDropPrimaryKey, + model.ActionDropTablePartition, model.ActionTruncateTablePartition, model.ActionDropColumn, model.ActionDropColumns, model.ActionModifyColumn, model.ActionDropIndexes: + return w.deleteRange(w.ddlJobCtx, job) + case model.ActionMultiSchemaChange: + for _, sub := range job.MultiSchemaInfo.SubJobs { + proxyJob := cloneFromSubJob(job, sub) + err := deleteRangeForDropSchemaObjectJob(w, proxyJob) + if err != nil { + return errors.Trace(err) + } + } + return nil + } + return nil +} + func (w *worker) writeDDLSeqNum(job *model.Job) { w.ddlSeqNumMu.Lock() w.ddlSeqNumMu.seqNum++ @@ -768,7 +785,9 @@ func (w *worker) runDDLJob(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, } // The cause of this job state is that the job is cancelled by client. if job.IsCancelling() { - return convertJob2RollbackJob(w, d, t, job) + if job.Type != model.ActionMultiSchemaChange { + return convertJob2RollbackJob(w, d, t, job) + } } if !job.IsRollingback() && !job.IsCancelling() { diff --git a/ddl/index.go b/ddl/index.go index d8e3657ae01e6..21627d6775583 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -16,6 +16,7 @@ package ddl import ( "context" + "sort" "strings" "sync/atomic" "time" @@ -621,11 +622,9 @@ func onDropIndex(t *meta.Meta, job *model.Job) (ver int64, _ error) { return ver, errors.Trace(dbterror.ErrOptOnCacheTable.GenWithStackByArgs("Drop Index")) } - dependentHiddenCols := make([]*model.ColumnInfo, 0) - for _, indexColumn := range indexInfo.Columns { - if tblInfo.Columns[indexColumn.Offset].Hidden { - dependentHiddenCols = append(dependentHiddenCols, tblInfo.Columns[indexColumn.Offset]) - } + if job.MultiSchemaInfo != nil && job.MultiSchemaInfo.Revertible { + job.MarkNonRevertible() + return updateVersionAndTableInfo(t, job, tblInfo, false) } originalState := indexInfo.State @@ -656,25 +655,11 @@ func onDropIndex(t *meta.Meta, job *model.Job) (ver int64, _ error) { job.SchemaState = model.StateDeleteReorganization case model.StateDeleteReorganization: // reorganization -> absent - if len(dependentHiddenCols) > 0 { - firstHiddenOffset := dependentHiddenCols[0].Offset - for i := 0; i < len(dependentHiddenCols); i++ { - // Set this column's offset to the last and reset all following columns' offsets. - adjustColumnInfoInDropColumn(tblInfo, firstHiddenOffset) - } - } - - newIndices := make([]*model.IndexInfo, 0, len(tblInfo.Indices)) - for _, idx := range tblInfo.Indices { - if idx.Name.L != indexInfo.Name.L { - newIndices = append(newIndices, idx) - } - } - tblInfo.Indices = newIndices // Set column index flag. dropIndexColumnFlag(tblInfo, indexInfo) + removeDependentHiddenColumns(tblInfo, indexInfo) + removeIndexInfo(tblInfo, indexInfo) - tblInfo.Columns = tblInfo.Columns[:len(tblInfo.Columns)-len(dependentHiddenCols)] failpoint.Inject("mockExceedErrorLimit", func(val failpoint.Value) { if val.(bool) { panic("panic test in cancelling add index") @@ -702,6 +687,44 @@ func onDropIndex(t *meta.Meta, job *model.Job) (ver int64, _ error) { return ver, errors.Trace(err) } +func removeDependentHiddenColumns(tblInfo *model.TableInfo, idxInfo *model.IndexInfo) { + hiddenColOffs := make([]int, 0) + for _, indexColumn := range idxInfo.Columns { + col := tblInfo.Columns[indexColumn.Offset] + if col.Hidden { + hiddenColOffs = append(hiddenColOffs, col.Offset) + } + } + // Sort the offset in descending order. + sort.Slice(hiddenColOffs, func(i, j int) bool { + return hiddenColOffs[i] > hiddenColOffs[j] + }) + // Move all the dependent hidden columns to the end. + endOffset := len(tblInfo.Columns) - 1 + for _, offset := range hiddenColOffs { + tblInfo.MoveColumnInfo(offset, endOffset) + } + tblInfo.Columns = tblInfo.Columns[:len(tblInfo.Columns)-len(hiddenColOffs)] +} + +func removeIndexInfo(tblInfo *model.TableInfo, idxInfo *model.IndexInfo) { + indices := tblInfo.Indices + offset := -1 + for i, idx := range indices { + if idxInfo.ID == idx.ID { + offset = i + break + } + } + if offset == -1 { + // The target index has been removed. + return + } + // Swap the target index to the end and remove it. + indices[offset], indices[len(indices)-1] = indices[len(indices)-1], indices[offset] + tblInfo.Indices = tblInfo.Indices[:len(tblInfo.Indices)-1] +} + func checkDropIndex(t *meta.Meta, job *model.Job) (*model.TableInfo, *model.IndexInfo, error) { schemaID := job.SchemaID tblInfo, err := getTableInfoAndCancelFaultJob(t, job, schemaID) diff --git a/ddl/multi_schema_change.go b/ddl/multi_schema_change.go index 3feda693daa05..dc6926852a229 100644 --- a/ddl/multi_schema_change.go +++ b/ddl/multi_schema_change.go @@ -59,8 +59,8 @@ func onMultiSchemaChange(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job) (ve } proxyJob := cloneFromSubJob(job, sub) ver, err = w.runDDLJob(d, t, proxyJob) - mergeBackToSubJob(proxyJob, sub) handleRevertibleException(job, sub.State, i) + mergeBackToSubJob(proxyJob, sub) return ver, err } // All the sub-jobs are non-revertible. @@ -95,7 +95,7 @@ func isFinished(job *model.SubJob) bool { func cloneFromSubJob(job *model.Job, sub *model.SubJob) *model.Job { return &model.Job{ - ID: 0, + ID: job.ID, Type: sub.Type, SchemaID: job.SchemaID, TableID: job.TableID, @@ -135,10 +135,10 @@ func handleRevertibleException(job *model.Job, res model.JobState, idx int) { if res == model.JobStateRollingback || res == model.JobStateCancelling { job.State = res } - // Flush the rollback state and cancelled state to sub-jobs. + // Flush the cancelling state and cancelled state to sub-jobs. for i, sub := range job.MultiSchemaInfo.SubJobs { if i < idx { - sub.State = model.JobStateRollingback + sub.State = model.JobStateCancelling } if i > idx { sub.State = model.JobStateCancelled diff --git a/ddl/multi_schema_change_test.go b/ddl/multi_schema_change_test.go index 2deb19cd73eb1..217ba1d2a990b 100644 --- a/ddl/multi_schema_change_test.go +++ b/ddl/multi_schema_change_test.go @@ -15,10 +15,16 @@ package ddl_test import ( + "context" "testing" + "github.com/pingcap/tidb/ddl" "github.com/pingcap/tidb/errno" + "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/testkit" + "github.com/pingcap/tidb/util/admin" + "github.com/stretchr/testify/require" ) func TestMultiSchemaChangeAddColumns(t *testing.T) { @@ -71,6 +77,38 @@ func TestMultiSchemaChangeAddColumns(t *testing.T) { tk.MustGetErrCode("alter table t add column b int default 2, add column b int default 3", errno.ErrUnsupportedDDLOperation) } +func TestMultiSchemaChangeAddColumnsCancelled(t *testing.T) { + store, dom, clean := testkit.CreateMockStoreAndDomain(t) + defer clean() + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("set @@global.tidb_enable_change_multi_schema = 1") + + tk.MustExec("create table t (a int);") + tk.MustExec("insert into t values (1);") + var checkErr error + var once bool + hook := func(job *model.Job) { + if once || job.MultiSchemaInfo.SubJobs[1].SchemaState != model.StateWriteReorganization { + return + } + once = true + checkErr = kv.RunInNewTxn(context.Background(), store, false, + func(ctx context.Context, txn kv.Transaction) error { + errs, err := admin.CancelJobs(txn, []int64{job.ID}) + if errs[0] != nil { + return errs[0] + } + return err + }) + } + dom.DDL().SetHook(&ddl.TestDDLCallback{Do: dom, OnJobUpdatedExported: hook}) + require.NoError(t, checkErr) + sql := "alter table t add column b int default 2, add column c int default 3, add column d int default 4;" + tk.MustGetErrCode(sql, errno.ErrCancelledDDLJob) + tk.MustQuery("select * from t;").Check(testkit.Rows("1")) +} + func TestMultiSchemaChangeDropColumns(t *testing.T) { store, clean := testkit.CreateMockStore(t) defer clean() @@ -191,6 +229,12 @@ func TestMultiSchemaChangeDropIndexes(t *testing.T) { tk.MustExec("create table t (a int, b int, c int, index t(a))") tk.MustGetErrCode("alter table t drop index t, drop index t", errno.ErrUnsupportedDDLOperation) + tk.MustExec("create table t (id int, c1 int, c2 int, primary key(id), key i1(c1), key i2(c2));") + tk.MustExec("insert into t values (1, 2, 3);") + tk.MustExec("alter table t drop index i1, drop index i2;") + tk.MustGetErrCode("select * from t use index(i1);", errno.ErrKeyDoesNotExist) + tk.MustGetErrCode("select * from t use index(i2);", errno.ErrKeyDoesNotExist) + // Test drop index with drop column. /* tk.MustExec("drop table if exists t") diff --git a/util/admin/admin.go b/util/admin/admin.go index 8bb29e142f190..ea39fe070379a 100644 --- a/util/admin/admin.go +++ b/util/admin/admin.go @@ -119,6 +119,8 @@ func IsJobRollbackable(job *model.Job) bool { model.ActionModifySchemaCharsetAndCollate, model.ActionRepairTable, model.ActionModifyTableAutoIdCache, model.ActionModifySchemaDefaultPlacement: return job.SchemaState == model.StateNone + case model.ActionMultiSchemaChange: + return job.MultiSchemaInfo.Revertible } return true } @@ -166,6 +168,9 @@ func CancelJobs(txn kv.Transaction, ids []int64) ([]error, error) { errs[i] = ErrCannotCancelDDLJob.GenWithStackByArgs(job.ID) continue } + if job.IsCancelling() { + continue + } job.State = model.JobStateCancelling // Make sure RawArgs isn't overwritten.