Skip to content

Commit

Permalink
support multi-schema-change for drop index (#20)
Browse files Browse the repository at this point in the history
* support multi-schema-change for drop index

* fix integration tests
  • Loading branch information
tangenta authored Mar 14, 2022
1 parent e0da4c5 commit bd47071
Show file tree
Hide file tree
Showing 9 changed files with 149 additions and 48 deletions.
11 changes: 6 additions & 5 deletions ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,9 @@ func onAddColumn(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error)
}

func locateOffsetForColumn(pos *ast.ColumnPosition, tblInfo *model.TableInfo) (offset int, err error) {
if pos == nil {
return -1, nil
}
// Get column offset.
switch pos.Tp {
case ast.ColumnPositionFirst:
Expand Down Expand Up @@ -362,7 +365,7 @@ func onAddColumns(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error
case model.StateWriteReorganization:
// reorganization -> public
// Adjust table column offsets.
for i, newCol := range tblInfo.Columns[:len(tblInfo.Columns)-len(positions)] {
for i, newCol := range tblInfo.Columns[len(tblInfo.Columns)-len(positions):] {
offset, err := locateOffsetForColumn(positions[i], tblInfo)
if err != nil {
return ver, errors.Trace(err)
Expand Down Expand Up @@ -540,10 +543,7 @@ func onDropColumn(t *meta.Meta, job *model.Job) (ver int64, _ error) {
}
if job.MultiSchemaInfo != nil && job.MultiSchemaInfo.Revertible {
job.MarkNonRevertible()
ver, err = updateVersionAndTableInfoWithCheck(t, job, tblInfo, false)
if err != nil {
return ver, errors.Trace(err)
}
return updateVersionAndTableInfoWithCheck(t, job, tblInfo, false)
}

originalState := colInfo.State
Expand Down Expand Up @@ -590,6 +590,7 @@ func onDropColumn(t *meta.Meta, job *model.Job) (ver int64, _ error) {
case model.StateDeleteReorganization:
// reorganization -> absent
// All reorganization jobs are done, drop this column.
tblInfo.MoveColumnInfo(colInfo.Offset, len(tblInfo.Columns)-1)
tblInfo.Columns = tblInfo.Columns[:len(tblInfo.Columns)-1]
colInfo.State = model.StateNone
ver, err = updateVersionAndTableInfo(t, job, tblInfo, originalState != colInfo.State)
Expand Down
5 changes: 4 additions & 1 deletion ddl/column_modify_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -568,7 +568,10 @@ func TestCancelDropColumns(t *testing.T) {
var jobID int64
testCase := &testCases[0]
hook.OnJobRunBeforeExported = func(job *model.Job) {
if job.Type == model.ActionDropColumns && job.State == testCase.jobState && job.SchemaState == testCase.JobSchemaState {
isDropColTp := job.Type == model.ActionMultiSchemaChange &&
job.MultiSchemaInfo.SubJobs[0].Type == model.ActionDropColumn
if isDropColTp && job.MultiSchemaInfo.SubJobs[0].State == testCase.jobState &&
job.MultiSchemaInfo.SubJobs[0].SchemaState == testCase.JobSchemaState {
jobIDs := []int64{job.ID}
jobID = job.ID
hookCtx := mock.NewContext()
Expand Down
6 changes: 4 additions & 2 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -709,14 +709,16 @@ func (d *ddl) doDDLJob(ctx sessionctx.Context, job *model.Job) error {
return errors.Trace(historyJob.Error)
}
// Only for JobStateCancelled job which is adding columns or drop columns or drop indexes.
if historyJob.IsCancelled() && (historyJob.Type == model.ActionAddColumns || historyJob.Type == model.ActionDropColumns || historyJob.Type == model.ActionDropIndexes) {
if historyJob.IsCancelled() && (historyJob.Type == model.ActionAddColumns || historyJob.Type == model.ActionDropColumns ||
historyJob.Type == model.ActionDropIndexes ||
historyJob.Type == model.ActionMultiSchemaChange) {
if historyJob.MultiSchemaInfo != nil && len(historyJob.MultiSchemaInfo.Warnings) != 0 {
for _, warning := range historyJob.MultiSchemaInfo.Warnings {
ctx.GetSessionVars().StmtCtx.AppendWarning(warning)
}
}
logutil.BgLogger().Info("[ddl] DDL job is cancelled", zap.Int64("jobID", jobID))
return nil
return errCancelledDDLJob
}
panic("When the state is JobStateRollbackDone or JobStateCancelled, historyJob.Error should never be nil")
}
Expand Down
6 changes: 5 additions & 1 deletion ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -3949,13 +3949,17 @@ func (d *ddl) DropColumn(ctx sessionctx.Context, ti ast.Ident, spec *ast.AlterTa
return err
}

var multiSchemaInfo *model.MultiSchemaInfo
if ctx.GetSessionVars().EnableChangeMultiSchema {
multiSchemaInfo = &model.MultiSchemaInfo{}
}
job := &model.Job{
SchemaID: schema.ID,
TableID: t.Meta().ID,
SchemaName: schema.Name.L,
Type: model.ActionDropColumn,
BinlogInfo: &model.HistoryInfo{},
MultiSchemaInfo: nil,
MultiSchemaInfo: multiSchemaInfo,
Args: []interface{}{colName},
}

Expand Down
47 changes: 33 additions & 14 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,19 +401,9 @@ func (w *worker) finishDDLJob(t *meta.Meta, job *model.Job) (err error) {
metrics.DDLWorkerHistogram.WithLabelValues(metrics.WorkerFinishDDLJob, job.Type.String(), metrics.RetLabel(err)).Observe(time.Since(startTime).Seconds())
}()

if !job.IsCancelled() {
switch job.Type {
case model.ActionAddIndex, model.ActionAddPrimaryKey:
if job.State != model.JobStateRollbackDone {
break
}

// After rolling back an AddIndex operation, we need to use delete-range to delete the half-done index data.
err = w.deleteRange(w.ddlJobCtx, job)
case model.ActionDropSchema, model.ActionDropTable, model.ActionTruncateTable, model.ActionDropIndex, model.ActionDropPrimaryKey,
model.ActionDropTablePartition, model.ActionTruncateTablePartition, model.ActionDropColumn, model.ActionDropColumns, model.ActionModifyColumn, model.ActionDropIndexes:
err = w.deleteRange(w.ddlJobCtx, job)
}
err = deleteRangeForDropSchemaObjectJob(w, job)
if err != nil {
return errors.Trace(err)
}

switch job.Type {
Expand Down Expand Up @@ -448,6 +438,33 @@ func (w *worker) finishDDLJob(t *meta.Meta, job *model.Job) (err error) {
return errors.Trace(err)
}

func deleteRangeForDropSchemaObjectJob(w *worker, job *model.Job) error {
if job.IsCancelled() {
return nil
}
switch job.Type {
case model.ActionAddIndex, model.ActionAddPrimaryKey:
if job.State != model.JobStateRollbackDone {
break
}
// After rolling back an AddIndex operation, we need to use delete-range to delete the half-done index data.
return w.deleteRange(w.ddlJobCtx, job)
case model.ActionDropSchema, model.ActionDropTable, model.ActionTruncateTable, model.ActionDropIndex, model.ActionDropPrimaryKey,
model.ActionDropTablePartition, model.ActionTruncateTablePartition, model.ActionDropColumn, model.ActionDropColumns, model.ActionModifyColumn, model.ActionDropIndexes:
return w.deleteRange(w.ddlJobCtx, job)
case model.ActionMultiSchemaChange:
for _, sub := range job.MultiSchemaInfo.SubJobs {
proxyJob := cloneFromSubJob(job, sub)
err := deleteRangeForDropSchemaObjectJob(w, proxyJob)
if err != nil {
return errors.Trace(err)
}
}
return nil
}
return nil
}

func (w *worker) writeDDLSeqNum(job *model.Job) {
w.ddlSeqNumMu.Lock()
w.ddlSeqNumMu.seqNum++
Expand Down Expand Up @@ -768,7 +785,9 @@ func (w *worker) runDDLJob(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64,
}
// The cause of this job state is that the job is cancelled by client.
if job.IsCancelling() {
return convertJob2RollbackJob(w, d, t, job)
if job.Type != model.ActionMultiSchemaChange {
return convertJob2RollbackJob(w, d, t, job)
}
}

if !job.IsRollingback() && !job.IsCancelling() {
Expand Down
65 changes: 44 additions & 21 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package ddl

import (
"context"
"sort"
"strings"
"sync/atomic"
"time"
Expand Down Expand Up @@ -621,11 +622,9 @@ func onDropIndex(t *meta.Meta, job *model.Job) (ver int64, _ error) {
return ver, errors.Trace(dbterror.ErrOptOnCacheTable.GenWithStackByArgs("Drop Index"))
}

dependentHiddenCols := make([]*model.ColumnInfo, 0)
for _, indexColumn := range indexInfo.Columns {
if tblInfo.Columns[indexColumn.Offset].Hidden {
dependentHiddenCols = append(dependentHiddenCols, tblInfo.Columns[indexColumn.Offset])
}
if job.MultiSchemaInfo != nil && job.MultiSchemaInfo.Revertible {
job.MarkNonRevertible()
return updateVersionAndTableInfo(t, job, tblInfo, false)
}

originalState := indexInfo.State
Expand Down Expand Up @@ -656,25 +655,11 @@ func onDropIndex(t *meta.Meta, job *model.Job) (ver int64, _ error) {
job.SchemaState = model.StateDeleteReorganization
case model.StateDeleteReorganization:
// reorganization -> absent
if len(dependentHiddenCols) > 0 {
firstHiddenOffset := dependentHiddenCols[0].Offset
for i := 0; i < len(dependentHiddenCols); i++ {
// Set this column's offset to the last and reset all following columns' offsets.
adjustColumnInfoInDropColumn(tblInfo, firstHiddenOffset)
}
}

newIndices := make([]*model.IndexInfo, 0, len(tblInfo.Indices))
for _, idx := range tblInfo.Indices {
if idx.Name.L != indexInfo.Name.L {
newIndices = append(newIndices, idx)
}
}
tblInfo.Indices = newIndices
// Set column index flag.
dropIndexColumnFlag(tblInfo, indexInfo)
removeDependentHiddenColumns(tblInfo, indexInfo)
removeIndexInfo(tblInfo, indexInfo)

tblInfo.Columns = tblInfo.Columns[:len(tblInfo.Columns)-len(dependentHiddenCols)]
failpoint.Inject("mockExceedErrorLimit", func(val failpoint.Value) {
if val.(bool) {
panic("panic test in cancelling add index")
Expand Down Expand Up @@ -702,6 +687,44 @@ func onDropIndex(t *meta.Meta, job *model.Job) (ver int64, _ error) {
return ver, errors.Trace(err)
}

func removeDependentHiddenColumns(tblInfo *model.TableInfo, idxInfo *model.IndexInfo) {
hiddenColOffs := make([]int, 0)
for _, indexColumn := range idxInfo.Columns {
col := tblInfo.Columns[indexColumn.Offset]
if col.Hidden {
hiddenColOffs = append(hiddenColOffs, col.Offset)
}
}
// Sort the offset in descending order.
sort.Slice(hiddenColOffs, func(i, j int) bool {
return hiddenColOffs[i] > hiddenColOffs[j]
})
// Move all the dependent hidden columns to the end.
endOffset := len(tblInfo.Columns) - 1
for _, offset := range hiddenColOffs {
tblInfo.MoveColumnInfo(offset, endOffset)
}
tblInfo.Columns = tblInfo.Columns[:len(tblInfo.Columns)-len(hiddenColOffs)]
}

func removeIndexInfo(tblInfo *model.TableInfo, idxInfo *model.IndexInfo) {
indices := tblInfo.Indices
offset := -1
for i, idx := range indices {
if idxInfo.ID == idx.ID {
offset = i
break
}
}
if offset == -1 {
// The target index has been removed.
return
}
// Swap the target index to the end and remove it.
indices[offset], indices[len(indices)-1] = indices[len(indices)-1], indices[offset]
tblInfo.Indices = tblInfo.Indices[:len(tblInfo.Indices)-1]
}

func checkDropIndex(t *meta.Meta, job *model.Job) (*model.TableInfo, *model.IndexInfo, error) {
schemaID := job.SchemaID
tblInfo, err := getTableInfoAndCancelFaultJob(t, job, schemaID)
Expand Down
8 changes: 4 additions & 4 deletions ddl/multi_schema_change.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ func onMultiSchemaChange(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job) (ve
}
proxyJob := cloneFromSubJob(job, sub)
ver, err = w.runDDLJob(d, t, proxyJob)
mergeBackToSubJob(proxyJob, sub)
handleRevertibleException(job, sub.State, i)
mergeBackToSubJob(proxyJob, sub)
return ver, err
}
// All the sub-jobs are non-revertible.
Expand Down Expand Up @@ -95,7 +95,7 @@ func isFinished(job *model.SubJob) bool {

func cloneFromSubJob(job *model.Job, sub *model.SubJob) *model.Job {
return &model.Job{
ID: 0,
ID: job.ID,
Type: sub.Type,
SchemaID: job.SchemaID,
TableID: job.TableID,
Expand Down Expand Up @@ -135,10 +135,10 @@ func handleRevertibleException(job *model.Job, res model.JobState, idx int) {
if res == model.JobStateRollingback || res == model.JobStateCancelling {
job.State = res
}
// Flush the rollback state and cancelled state to sub-jobs.
// Flush the cancelling state and cancelled state to sub-jobs.
for i, sub := range job.MultiSchemaInfo.SubJobs {
if i < idx {
sub.State = model.JobStateRollingback
sub.State = model.JobStateCancelling
}
if i > idx {
sub.State = model.JobStateCancelled
Expand Down
44 changes: 44 additions & 0 deletions ddl/multi_schema_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,16 @@
package ddl_test

import (
"context"
"testing"

"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/errno"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/testkit"
"github.com/pingcap/tidb/util/admin"
"github.com/stretchr/testify/require"
)

func TestMultiSchemaChangeAddColumns(t *testing.T) {
Expand Down Expand Up @@ -71,6 +77,38 @@ func TestMultiSchemaChangeAddColumns(t *testing.T) {
tk.MustGetErrCode("alter table t add column b int default 2, add column b int default 3", errno.ErrUnsupportedDDLOperation)
}

func TestMultiSchemaChangeAddColumnsCancelled(t *testing.T) {
store, dom, clean := testkit.CreateMockStoreAndDomain(t)
defer clean()
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("set @@global.tidb_enable_change_multi_schema = 1")

tk.MustExec("create table t (a int);")
tk.MustExec("insert into t values (1);")
var checkErr error
var once bool
hook := func(job *model.Job) {
if once || job.MultiSchemaInfo.SubJobs[1].SchemaState != model.StateWriteReorganization {
return
}
once = true
checkErr = kv.RunInNewTxn(context.Background(), store, false,
func(ctx context.Context, txn kv.Transaction) error {
errs, err := admin.CancelJobs(txn, []int64{job.ID})
if errs[0] != nil {
return errs[0]
}
return err
})
}
dom.DDL().SetHook(&ddl.TestDDLCallback{Do: dom, OnJobUpdatedExported: hook})
require.NoError(t, checkErr)
sql := "alter table t add column b int default 2, add column c int default 3, add column d int default 4;"
tk.MustGetErrCode(sql, errno.ErrCancelledDDLJob)
tk.MustQuery("select * from t;").Check(testkit.Rows("1"))
}

func TestMultiSchemaChangeDropColumns(t *testing.T) {
store, clean := testkit.CreateMockStore(t)
defer clean()
Expand Down Expand Up @@ -191,6 +229,12 @@ func TestMultiSchemaChangeDropIndexes(t *testing.T) {
tk.MustExec("create table t (a int, b int, c int, index t(a))")
tk.MustGetErrCode("alter table t drop index t, drop index t", errno.ErrUnsupportedDDLOperation)

tk.MustExec("create table t (id int, c1 int, c2 int, primary key(id), key i1(c1), key i2(c2));")
tk.MustExec("insert into t values (1, 2, 3);")
tk.MustExec("alter table t drop index i1, drop index i2;")
tk.MustGetErrCode("select * from t use index(i1);", errno.ErrKeyDoesNotExist)
tk.MustGetErrCode("select * from t use index(i2);", errno.ErrKeyDoesNotExist)

// Test drop index with drop column.
/*
tk.MustExec("drop table if exists t")
Expand Down
5 changes: 5 additions & 0 deletions util/admin/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ func IsJobRollbackable(job *model.Job) bool {
model.ActionModifySchemaCharsetAndCollate, model.ActionRepairTable,
model.ActionModifyTableAutoIdCache, model.ActionModifySchemaDefaultPlacement:
return job.SchemaState == model.StateNone
case model.ActionMultiSchemaChange:
return job.MultiSchemaInfo.Revertible
}
return true
}
Expand Down Expand Up @@ -166,6 +168,9 @@ func CancelJobs(txn kv.Transaction, ids []int64) ([]error, error) {
errs[i] = ErrCannotCancelDDLJob.GenWithStackByArgs(job.ID)
continue
}
if job.IsCancelling() {
continue
}

job.State = model.JobStateCancelling
// Make sure RawArgs isn't overwritten.
Expand Down

0 comments on commit bd47071

Please sign in to comment.