Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support multi-schema-change for drop index #20

Merged
merged 2 commits into from
Mar 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,9 @@ func onAddColumn(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error)
}

func locateOffsetForColumn(pos *ast.ColumnPosition, tblInfo *model.TableInfo) (offset int, err error) {
if pos == nil {
return -1, nil
}
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

locateOffsetForColumn is also used by AddColumn so the pos can be null.

// Get column offset.
switch pos.Tp {
case ast.ColumnPositionFirst:
Expand Down Expand Up @@ -362,7 +365,7 @@ func onAddColumns(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error
case model.StateWriteReorganization:
// reorganization -> public
// Adjust table column offsets.
for i, newCol := range tblInfo.Columns[:len(tblInfo.Columns)-len(positions)] {
for i, newCol := range tblInfo.Columns[len(tblInfo.Columns)-len(positions):] {
Copy link
Owner Author

@tangenta tangenta Mar 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added test in TestMultiSchemaChangeAddColumns:

create table t (a int, b int, c int);
alter table t add column (d int default 4, e int default 5);

offset, err := locateOffsetForColumn(positions[i], tblInfo)
if err != nil {
return ver, errors.Trace(err)
Expand Down Expand Up @@ -540,10 +543,7 @@ func onDropColumn(t *meta.Meta, job *model.Job) (ver int64, _ error) {
}
if job.MultiSchemaInfo != nil && job.MultiSchemaInfo.Revertible {
job.MarkNonRevertible()
ver, err = updateVersionAndTableInfoWithCheck(t, job, tblInfo, false)
if err != nil {
return ver, errors.Trace(err)
}
return updateVersionAndTableInfoWithCheck(t, job, tblInfo, false)
}

originalState := colInfo.State
Expand Down Expand Up @@ -590,6 +590,7 @@ func onDropColumn(t *meta.Meta, job *model.Job) (ver int64, _ error) {
case model.StateDeleteReorganization:
// reorganization -> absent
// All reorganization jobs are done, drop this column.
tblInfo.MoveColumnInfo(colInfo.Offset, len(tblInfo.Columns)-1)
tblInfo.Columns = tblInfo.Columns[:len(tblInfo.Columns)-1]
colInfo.State = model.StateNone
ver, err = updateVersionAndTableInfo(t, job, tblInfo, originalState != colInfo.State)
Expand Down
5 changes: 4 additions & 1 deletion ddl/column_modify_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -568,7 +568,10 @@ func TestCancelDropColumns(t *testing.T) {
var jobID int64
testCase := &testCases[0]
hook.OnJobRunBeforeExported = func(job *model.Job) {
if job.Type == model.ActionDropColumns && job.State == testCase.jobState && job.SchemaState == testCase.JobSchemaState {
isDropColTp := job.Type == model.ActionMultiSchemaChange &&
job.MultiSchemaInfo.SubJobs[0].Type == model.ActionDropColumn
if isDropColTp && job.MultiSchemaInfo.SubJobs[0].State == testCase.jobState &&
job.MultiSchemaInfo.SubJobs[0].SchemaState == testCase.JobSchemaState {
jobIDs := []int64{job.ID}
jobID = job.ID
hookCtx := mock.NewContext()
Expand Down
6 changes: 4 additions & 2 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -709,14 +709,16 @@ func (d *ddl) doDDLJob(ctx sessionctx.Context, job *model.Job) error {
return errors.Trace(historyJob.Error)
}
// Only for JobStateCancelled job which is adding columns or drop columns or drop indexes.
if historyJob.IsCancelled() && (historyJob.Type == model.ActionAddColumns || historyJob.Type == model.ActionDropColumns || historyJob.Type == model.ActionDropIndexes) {
if historyJob.IsCancelled() && (historyJob.Type == model.ActionAddColumns || historyJob.Type == model.ActionDropColumns ||
historyJob.Type == model.ActionDropIndexes ||
historyJob.Type == model.ActionMultiSchemaChange) {
if historyJob.MultiSchemaInfo != nil && len(historyJob.MultiSchemaInfo.Warnings) != 0 {
for _, warning := range historyJob.MultiSchemaInfo.Warnings {
ctx.GetSessionVars().StmtCtx.AppendWarning(warning)
}
}
logutil.BgLogger().Info("[ddl] DDL job is cancelled", zap.Int64("jobID", jobID))
return nil
return errCancelledDDLJob
}
panic("When the state is JobStateRollbackDone or JobStateCancelled, historyJob.Error should never be nil")
}
Expand Down
6 changes: 5 additions & 1 deletion ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -3949,13 +3949,17 @@ func (d *ddl) DropColumn(ctx sessionctx.Context, ti ast.Ident, spec *ast.AlterTa
return err
}

var multiSchemaInfo *model.MultiSchemaInfo
if ctx.GetSessionVars().EnableChangeMultiSchema {
multiSchemaInfo = &model.MultiSchemaInfo{}
}
job := &model.Job{
SchemaID: schema.ID,
TableID: t.Meta().ID,
SchemaName: schema.Name.L,
Type: model.ActionDropColumn,
BinlogInfo: &model.HistoryInfo{},
MultiSchemaInfo: nil,
MultiSchemaInfo: multiSchemaInfo,
Args: []interface{}{colName},
}

Expand Down
47 changes: 33 additions & 14 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,19 +401,9 @@ func (w *worker) finishDDLJob(t *meta.Meta, job *model.Job) (err error) {
metrics.DDLWorkerHistogram.WithLabelValues(metrics.WorkerFinishDDLJob, job.Type.String(), metrics.RetLabel(err)).Observe(time.Since(startTime).Seconds())
}()

if !job.IsCancelled() {
switch job.Type {
case model.ActionAddIndex, model.ActionAddPrimaryKey:
if job.State != model.JobStateRollbackDone {
break
}

// After rolling back an AddIndex operation, we need to use delete-range to delete the half-done index data.
err = w.deleteRange(w.ddlJobCtx, job)
case model.ActionDropSchema, model.ActionDropTable, model.ActionTruncateTable, model.ActionDropIndex, model.ActionDropPrimaryKey,
model.ActionDropTablePartition, model.ActionTruncateTablePartition, model.ActionDropColumn, model.ActionDropColumns, model.ActionModifyColumn, model.ActionDropIndexes:
err = w.deleteRange(w.ddlJobCtx, job)
}
err = deleteRangeForDropSchemaObjectJob(w, job)
if err != nil {
return errors.Trace(err)
}

switch job.Type {
Expand Down Expand Up @@ -448,6 +438,33 @@ func (w *worker) finishDDLJob(t *meta.Meta, job *model.Job) (err error) {
return errors.Trace(err)
}

func deleteRangeForDropSchemaObjectJob(w *worker, job *model.Job) error {
if job.IsCancelled() {
return nil
}
switch job.Type {
case model.ActionAddIndex, model.ActionAddPrimaryKey:
if job.State != model.JobStateRollbackDone {
break
}
// After rolling back an AddIndex operation, we need to use delete-range to delete the half-done index data.
return w.deleteRange(w.ddlJobCtx, job)
case model.ActionDropSchema, model.ActionDropTable, model.ActionTruncateTable, model.ActionDropIndex, model.ActionDropPrimaryKey,
model.ActionDropTablePartition, model.ActionTruncateTablePartition, model.ActionDropColumn, model.ActionDropColumns, model.ActionModifyColumn, model.ActionDropIndexes:
return w.deleteRange(w.ddlJobCtx, job)
case model.ActionMultiSchemaChange:
for _, sub := range job.MultiSchemaInfo.SubJobs {
proxyJob := cloneFromSubJob(job, sub)
err := deleteRangeForDropSchemaObjectJob(w, proxyJob)
if err != nil {
return errors.Trace(err)
}
}
return nil
}
return nil
}

func (w *worker) writeDDLSeqNum(job *model.Job) {
w.ddlSeqNumMu.Lock()
w.ddlSeqNumMu.seqNum++
Expand Down Expand Up @@ -768,7 +785,9 @@ func (w *worker) runDDLJob(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64,
}
// The cause of this job state is that the job is cancelled by client.
if job.IsCancelling() {
return convertJob2RollbackJob(w, d, t, job)
if job.Type != model.ActionMultiSchemaChange {
return convertJob2RollbackJob(w, d, t, job)
}
}

if !job.IsRollingback() && !job.IsCancelling() {
Expand Down
65 changes: 44 additions & 21 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package ddl

import (
"context"
"sort"
"strings"
"sync/atomic"
"time"
Expand Down Expand Up @@ -621,11 +622,9 @@ func onDropIndex(t *meta.Meta, job *model.Job) (ver int64, _ error) {
return ver, errors.Trace(dbterror.ErrOptOnCacheTable.GenWithStackByArgs("Drop Index"))
}

dependentHiddenCols := make([]*model.ColumnInfo, 0)
for _, indexColumn := range indexInfo.Columns {
if tblInfo.Columns[indexColumn.Offset].Hidden {
dependentHiddenCols = append(dependentHiddenCols, tblInfo.Columns[indexColumn.Offset])
}
if job.MultiSchemaInfo != nil && job.MultiSchemaInfo.Revertible {
job.MarkNonRevertible()
return updateVersionAndTableInfo(t, job, tblInfo, false)
}

originalState := indexInfo.State
Expand Down Expand Up @@ -656,25 +655,11 @@ func onDropIndex(t *meta.Meta, job *model.Job) (ver int64, _ error) {
job.SchemaState = model.StateDeleteReorganization
case model.StateDeleteReorganization:
// reorganization -> absent
if len(dependentHiddenCols) > 0 {
firstHiddenOffset := dependentHiddenCols[0].Offset
for i := 0; i < len(dependentHiddenCols); i++ {
// Set this column's offset to the last and reset all following columns' offsets.
adjustColumnInfoInDropColumn(tblInfo, firstHiddenOffset)
}
}

newIndices := make([]*model.IndexInfo, 0, len(tblInfo.Indices))
for _, idx := range tblInfo.Indices {
if idx.Name.L != indexInfo.Name.L {
newIndices = append(newIndices, idx)
}
}
tblInfo.Indices = newIndices
// Set column index flag.
dropIndexColumnFlag(tblInfo, indexInfo)
removeDependentHiddenColumns(tblInfo, indexInfo)
removeIndexInfo(tblInfo, indexInfo)

tblInfo.Columns = tblInfo.Columns[:len(tblInfo.Columns)-len(dependentHiddenCols)]
failpoint.Inject("mockExceedErrorLimit", func(val failpoint.Value) {
if val.(bool) {
panic("panic test in cancelling add index")
Expand Down Expand Up @@ -702,6 +687,44 @@ func onDropIndex(t *meta.Meta, job *model.Job) (ver int64, _ error) {
return ver, errors.Trace(err)
}

func removeDependentHiddenColumns(tblInfo *model.TableInfo, idxInfo *model.IndexInfo) {
hiddenColOffs := make([]int, 0)
for _, indexColumn := range idxInfo.Columns {
col := tblInfo.Columns[indexColumn.Offset]
if col.Hidden {
hiddenColOffs = append(hiddenColOffs, col.Offset)
}
}
// Sort the offset in descending order.
sort.Slice(hiddenColOffs, func(i, j int) bool {
Defined2014 marked this conversation as resolved.
Show resolved Hide resolved
return hiddenColOffs[i] > hiddenColOffs[j]
})
// Move all the dependent hidden columns to the end.
endOffset := len(tblInfo.Columns) - 1
for _, offset := range hiddenColOffs {
tblInfo.MoveColumnInfo(offset, endOffset)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
tblInfo.MoveColumnInfo(offset, endOffset)
tblInfo.MoveColumnInfo(offset, endOffset)
endOffset--

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the order does not matter, because the columns will be removed anyway.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will reduce the number of exchanges

}
tblInfo.Columns = tblInfo.Columns[:len(tblInfo.Columns)-len(hiddenColOffs)]
}

func removeIndexInfo(tblInfo *model.TableInfo, idxInfo *model.IndexInfo) {
indices := tblInfo.Indices
offset := -1
for i, idx := range indices {
if idxInfo.ID == idx.ID {
offset = i
break
}
}
if offset == -1 {
// The target index has been removed.
return
}
// Swap the target index to the end and remove it.
indices[offset], indices[len(indices)-1] = indices[len(indices)-1], indices[offset]
tblInfo.Indices = tblInfo.Indices[:len(tblInfo.Indices)-1]
}

func checkDropIndex(t *meta.Meta, job *model.Job) (*model.TableInfo, *model.IndexInfo, error) {
schemaID := job.SchemaID
tblInfo, err := getTableInfoAndCancelFaultJob(t, job, schemaID)
Expand Down
8 changes: 4 additions & 4 deletions ddl/multi_schema_change.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ func onMultiSchemaChange(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job) (ve
}
proxyJob := cloneFromSubJob(job, sub)
ver, err = w.runDDLJob(d, t, proxyJob)
mergeBackToSubJob(proxyJob, sub)
handleRevertibleException(job, sub.State, i)
mergeBackToSubJob(proxyJob, sub)
return ver, err
}
// All the sub-jobs are non-revertible.
Expand Down Expand Up @@ -95,7 +95,7 @@ func isFinished(job *model.SubJob) bool {

func cloneFromSubJob(job *model.Job, sub *model.SubJob) *model.Job {
return &model.Job{
ID: 0,
ID: job.ID,
Type: sub.Type,
SchemaID: job.SchemaID,
TableID: job.TableID,
Expand Down Expand Up @@ -135,10 +135,10 @@ func handleRevertibleException(job *model.Job, res model.JobState, idx int) {
if res == model.JobStateRollingback || res == model.JobStateCancelling {
job.State = res
}
// Flush the rollback state and cancelled state to sub-jobs.
// Flush the cancelling state and cancelled state to sub-jobs.
for i, sub := range job.MultiSchemaInfo.SubJobs {
if i < idx {
sub.State = model.JobStateRollingback
sub.State = model.JobStateCancelling
}
if i > idx {
sub.State = model.JobStateCancelled
Expand Down
44 changes: 44 additions & 0 deletions ddl/multi_schema_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,16 @@
package ddl_test

import (
"context"
"testing"

"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/errno"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/testkit"
"github.com/pingcap/tidb/util/admin"
"github.com/stretchr/testify/require"
)

func TestMultiSchemaChangeAddColumns(t *testing.T) {
Expand Down Expand Up @@ -71,6 +77,38 @@ func TestMultiSchemaChangeAddColumns(t *testing.T) {
tk.MustGetErrCode("alter table t add column b int default 2, add column b int default 3", errno.ErrUnsupportedDDLOperation)
}

func TestMultiSchemaChangeAddColumnsCancelled(t *testing.T) {
store, dom, clean := testkit.CreateMockStoreAndDomain(t)
defer clean()
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("set @@global.tidb_enable_change_multi_schema = 1")

tk.MustExec("create table t (a int);")
tk.MustExec("insert into t values (1);")
var checkErr error
var once bool
hook := func(job *model.Job) {
if once || job.MultiSchemaInfo.SubJobs[1].SchemaState != model.StateWriteReorganization {
return
}
once = true
checkErr = kv.RunInNewTxn(context.Background(), store, false,
func(ctx context.Context, txn kv.Transaction) error {
errs, err := admin.CancelJobs(txn, []int64{job.ID})
if errs[0] != nil {
return errs[0]
}
return err
})
}
dom.DDL().SetHook(&ddl.TestDDLCallback{Do: dom, OnJobUpdatedExported: hook})
require.NoError(t, checkErr)
sql := "alter table t add column b int default 2, add column c int default 3, add column d int default 4;"
tk.MustGetErrCode(sql, errno.ErrCancelledDDLJob)
tk.MustQuery("select * from t;").Check(testkit.Rows("1"))
}

func TestMultiSchemaChangeDropColumns(t *testing.T) {
store, clean := testkit.CreateMockStore(t)
defer clean()
Expand Down Expand Up @@ -191,6 +229,12 @@ func TestMultiSchemaChangeDropIndexes(t *testing.T) {
tk.MustExec("create table t (a int, b int, c int, index t(a))")
tk.MustGetErrCode("alter table t drop index t, drop index t", errno.ErrUnsupportedDDLOperation)

tk.MustExec("create table t (id int, c1 int, c2 int, primary key(id), key i1(c1), key i2(c2));")
tk.MustExec("insert into t values (1, 2, 3);")
tk.MustExec("alter table t drop index i1, drop index i2;")
tk.MustGetErrCode("select * from t use index(i1);", errno.ErrKeyDoesNotExist)
tk.MustGetErrCode("select * from t use index(i2);", errno.ErrKeyDoesNotExist)

// Test drop index with drop column.
/*
tk.MustExec("drop table if exists t")
Expand Down
5 changes: 5 additions & 0 deletions util/admin/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ func IsJobRollbackable(job *model.Job) bool {
model.ActionModifySchemaCharsetAndCollate, model.ActionRepairTable,
model.ActionModifyTableAutoIdCache, model.ActionModifySchemaDefaultPlacement:
return job.SchemaState == model.StateNone
case model.ActionMultiSchemaChange:
return job.MultiSchemaInfo.Revertible
}
return true
}
Expand Down Expand Up @@ -166,6 +168,9 @@ func CancelJobs(txn kv.Transaction, ids []int64) ([]error, error) {
errs[i] = ErrCannotCancelDDLJob.GenWithStackByArgs(job.ID)
continue
}
if job.IsCancelling() {
continue
}

job.State = model.JobStateCancelling
// Make sure RawArgs isn't overwritten.
Expand Down