Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: support add indexes for multi-schema change #35989

Merged
merged 13 commits into from
Jul 7, 2022
20 changes: 2 additions & 18 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -3078,7 +3078,8 @@ func checkMultiSpecs(sctx sessionctx.Context, specs []*ast.AlterTableSpec) error
func allSupported(specs []*ast.AlterTableSpec) bool {
for _, s := range specs {
switch s.Tp {
case ast.AlterTableAddColumns, ast.AlterTableDropColumn, ast.AlterTableDropIndex, ast.AlterTableDropPrimaryKey:
case ast.AlterTableAddColumns, ast.AlterTableDropColumn, ast.AlterTableDropIndex, ast.AlterTableDropPrimaryKey,
ast.AlterTableAddConstraint:
default:
return false
}
Expand Down Expand Up @@ -3115,23 +3116,6 @@ func (d *ddl) AlterTable(ctx context.Context, sctx sessionctx.Context, stmt *ast
return err
}

if len(validSpecs) > 1 {
useMultiSchemaChange := false
switch validSpecs[0].Tp {
case ast.AlterTableAddColumns, ast.AlterTableDropColumn,
ast.AlterTableDropPrimaryKey, ast.AlterTableDropIndex:
useMultiSchemaChange = true
default:
return dbterror.ErrRunMultiSchemaChanges
}
if err != nil {
return errors.Trace(err)
}
if !useMultiSchemaChange {
return nil
}
}

if len(validSpecs) > 1 {
sctx.GetSessionVars().StmtCtx.MultiSchemaInfo = model.NewMultiSchemaInfo()
}
Expand Down
18 changes: 13 additions & 5 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,17 +414,25 @@ func (w *worker) updateDDLJob(t *meta.Meta, job *model.Job, meetErr bool) error
failpoint.Return(kv.ErrEntryTooLarge)
}
})
updateRawArgs := true
// If there is an error when running job and the RawArgs hasn't been decoded by DecodeArgs,
// so we shouldn't replace RawArgs with the marshaling Args.
if meetErr && (job.RawArgs != nil && job.Args == nil) {
updateRawArgs := needUpdateRawArgs(job, meetErr)
if !updateRawArgs {
logutil.Logger(w.logCtx).Info("[ddl] meet something wrong before update DDL job, shouldn't update raw args",
zap.String("job", job.String()))
updateRawArgs = false
}
return errors.Trace(t.UpdateDDLJob(0, job, updateRawArgs))
}

func needUpdateRawArgs(job *model.Job, meetErr bool) bool {
// If there is an error when running job and the RawArgs hasn't been decoded by DecodeArgs,
// we shouldn't replace RawArgs with the marshaling Args.
if meetErr && job.RawArgs != nil && job.Args == nil {
// However, for multi-schema change, the args of the parent job is always nil.
// Since Job.Encode() can handle the sub-jobs properly, we can safely update the raw args.
return job.MultiSchemaInfo != nil
}
return true
}

func (w *worker) deleteRange(ctx context.Context, job *model.Job) error {
var err error
if job.Version <= currentVersion {
Expand Down
57 changes: 46 additions & 11 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,10 +396,31 @@ func checkPrimaryKeyNotNull(d *ddlCtx, w *worker, sqlMode mysql.SQLMode, t *meta
return nil, err
}

func updateHiddenColumns(tblInfo *model.TableInfo, idxInfo *model.IndexInfo, state model.SchemaState) {
// moveAndUpdateHiddenColumnsToPublic updates the hidden columns to public, and
// moves the hidden columns to proper offsets, so that Table.Columns' states meet the assumption of
// [public, public, ..., public, non-public, non-public, ..., non-public].
func moveAndUpdateHiddenColumnsToPublic(tblInfo *model.TableInfo, idxInfo *model.IndexInfo) {
hiddenColOffset := make(map[int]struct{}, 0)
for _, col := range idxInfo.Columns {
if tblInfo.Columns[col.Offset].Hidden {
tblInfo.Columns[col.Offset].State = state
hiddenColOffset[col.Offset] = struct{}{}
}
}
if len(hiddenColOffset) == 0 {
return
}
// Find the first non-public column.
firstNonPublicPos := len(tblInfo.Columns) - 1
for i, c := range tblInfo.Columns {
if c.State != model.StatePublic {
firstNonPublicPos = i
break
}
}
for _, col := range idxInfo.Columns {
tblInfo.Columns[col.Offset].State = model.StatePublic
if _, needMove := hiddenColOffset[col.Offset]; needMove {
tblInfo.MoveColumnInfo(col.Offset, firstNonPublicPos)
tangenta marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
Expand Down Expand Up @@ -469,12 +490,9 @@ func (w *worker) onCreateIndex(d *ddlCtx, t *meta.Meta, job *model.Job, isPK boo

if indexInfo == nil {
if len(hiddenCols) > 0 {
pos := &ast.ColumnPosition{Tp: ast.ColumnPositionNone}
for _, hiddenCol := range hiddenCols {
_, _, _, err = createColumnInfoWithPosCheck(tblInfo, hiddenCol, pos)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
if len(hiddenCols) > 0 {
tangenta marked this conversation as resolved.
Show resolved Hide resolved
for _, hiddenCol := range hiddenCols {
initAndAddColumnToTable(tblInfo, hiddenCol)
}
}
}
Expand Down Expand Up @@ -532,7 +550,7 @@ func (w *worker) onCreateIndex(d *ddlCtx, t *meta.Meta, job *model.Job, isPK boo
case model.StateNone:
// none -> delete only
indexInfo.State = model.StateDeleteOnly
updateHiddenColumns(tblInfo, indexInfo, model.StatePublic)
moveAndUpdateHiddenColumnsToPublic(tblInfo, indexInfo)
ver, err = updateVersionAndTableInfoWithCheck(d, t, job, tblInfo, originalState != indexInfo.State)
if err != nil {
return ver, err
Expand Down Expand Up @@ -573,19 +591,23 @@ func (w *worker) onCreateIndex(d *ddlCtx, t *meta.Meta, job *model.Job, isPK boo
}

var done bool
done, ver, err = doReorgWorkForCreateIndex(w, d, t, job, tbl, indexInfo)
if job.MultiSchemaInfo != nil {
done, ver, err = doReorgWorkForCreateIndexMultiSchema(w, d, t, job, tbl, indexInfo)
} else {
done, ver, err = doReorgWorkForCreateIndex(w, d, t, job, tbl, indexInfo)
}
if !done {
return ver, err
}

indexInfo.State = model.StatePublic
// Set column index flag.
addIndexColumnFlag(tblInfo, indexInfo)
if isPK {
if err = updateColsNull2NotNull(tblInfo, indexInfo); err != nil {
return ver, errors.Trace(err)
}
}
indexInfo.State = model.StatePublic
ver, err = updateVersionAndTableInfo(d, t, job, tblInfo, originalState != indexInfo.State)
if err != nil {
return ver, errors.Trace(err)
Expand All @@ -599,6 +621,19 @@ func (w *worker) onCreateIndex(d *ddlCtx, t *meta.Meta, job *model.Job, isPK boo
return ver, errors.Trace(err)
}

func doReorgWorkForCreateIndexMultiSchema(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job,
tbl table.Table, indexInfo *model.IndexInfo) (done bool, ver int64, err error) {
if job.MultiSchemaInfo.Revertible {
done, ver, err = doReorgWorkForCreateIndex(w, d, t, job, tbl, indexInfo)
if done {
job.MarkNonRevertible()
done = false // We need another round to wait for all the others sub-jobs to finish.
}
return done, ver, err
tangenta marked this conversation as resolved.
Show resolved Hide resolved
}
return true, ver, err
}

func doReorgWorkForCreateIndex(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job,
tbl table.Table, indexInfo *model.IndexInfo) (done bool, ver int64, err error) {
elements := []*meta.Element{{ID: indexInfo.ID, TypeKey: meta.IndexElementKey}}
Expand Down
36 changes: 35 additions & 1 deletion ddl/multi_schema_change.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,12 @@ func onMultiSchemaChange(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job) (ve
}
proxyJob := sub.ToProxyJob(job)
ver, err = w.runDDLJob(d, t, &proxyJob)
err = handleRollbackException(err, proxyJob.Error)
if err != nil {
return ver, err
}
sub.FromProxyJob(&proxyJob)
return ver, err
return ver, nil
}
// The last rollback/cancelling sub-job is done.
job.State = model.JobStateRollbackDone
Expand Down Expand Up @@ -154,6 +158,22 @@ func handleRevertibleException(job *model.Job, subJob *model.SubJob, err *terror
}
}

func handleRollbackException(runJobErr error, proxyJobErr *terror.Error) error {
if runJobErr != nil {
// The physical errors are not recoverable during rolling back.
// We keep retrying it.
return runJobErr
}
if proxyJobErr != nil {
if proxyJobErr.Equal(dbterror.ErrCancelledDDLJob) {
// A cancelled DDL error is normal during rolling back.
return nil
}
return proxyJobErr
}
return nil
}

func appendToSubJobs(m *model.MultiSchemaInfo, job *model.Job) error {
err := fillMultiSchemaInfo(m, job)
if err != nil {
Expand Down Expand Up @@ -189,6 +209,20 @@ func fillMultiSchemaInfo(info *model.MultiSchemaInfo, job *model.Job) (err error
case model.ActionDropIndex, model.ActionDropPrimaryKey:
indexName := job.Args[0].(model.CIStr)
info.DropIndexes = append(info.DropIndexes, indexName)
case model.ActionAddIndex, model.ActionAddPrimaryKey:
indexName := job.Args[1].(model.CIStr)
indexPartSpecifications := job.Args[2].([]*ast.IndexPartSpecification)
info.AddIndexes = append(info.AddIndexes, indexName)
for _, indexPartSpecification := range indexPartSpecifications {
info.RelativeColumns = append(info.RelativeColumns, indexPartSpecification.Column.Name)
}
if hiddenCols, ok := job.Args[4].([]*model.ColumnInfo); ok {
for _, c := range hiddenCols {
for depColName := range c.Dependences {
info.RelativeColumns = append(info.RelativeColumns, model.NewCIStr(depColName))
}
}
}
default:
return dbterror.ErrRunMultiSchemaChanges
}
Expand Down
105 changes: 105 additions & 0 deletions ddl/multi_schema_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,84 @@ func TestMultiSchemaChangeAddDropColumns(t *testing.T) {
tk.MustGetErrCode("alter table t add column c int default 3 after a, add column d int default 4 first, drop column a, drop column b;", errno.ErrUnsupportedDDLOperation)
}

func TestMultiSchemaChangeAddIndexes(t *testing.T) {
store, clean := testkit.CreateMockStore(t)
defer clean()
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")

// Test add multiple indexes with same column.
tk.MustExec("drop table if exists t;")
tk.MustExec("create table t (a int, b int, c int);")
tk.MustExec("insert into t values (1, 2, 3);")
tk.MustExec("alter table t add index t(a, b), add index t1(a);")
tk.MustExec("alter table t add index t2(a), add index t3(a, b);")
tk.MustQuery("select * from t use index (t, t1, t2, t3);").Check(testkit.Rows("1 2 3"))
tk.MustExec("admin check table t;")

// Test add multiple indexes with same name.
tk.MustExec("drop table if exists t")
tk.MustExec("create table t (a int, b int, c int)")
tk.MustGetErrCode("alter table t add index t(a), add index t(b)", errno.ErrUnsupportedDDLOperation)
tangenta marked this conversation as resolved.
Show resolved Hide resolved

// Test add indexes with drop column.
tk.MustExec("drop table if exists t")
tk.MustExec("create table t (a int, b int, c int)")
tk.MustGetErrCode("alter table t add index t(a), drop column a", errno.ErrUnsupportedDDLOperation)
tk.MustGetErrCode("alter table t add index t(a, b), drop column a", errno.ErrUnsupportedDDLOperation)

// Test add index failed.
tk.MustExec("drop table if exists t;")
tk.MustExec("create table t (a int, b int, c int);")
tk.MustExec("insert into t values (1, 1, 1), (2, 2, 2), (3, 3, 1);")
tk.MustGetErrCode("alter table t add unique index i1(a), add unique index i2(a, b), add unique index i3(c);",
errno.ErrDupEntry)
tk.MustQuery("show index from t;").Check(testkit.Rows( /* no index */ ))
tk.MustExec("alter table t add index i1(a), add index i2(a, b), add index i3(c);")
tangenta marked this conversation as resolved.
Show resolved Hide resolved
}

func TestMultiSchemaChangeAddIndexesCancelled(t *testing.T) {
store, dom, clean := testkit.CreateMockStoreAndDomain(t)
defer clean()
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
originHook := dom.DDL().GetHook()

// Test cancel successfully.
tk.MustExec("drop table if exists t;")
tk.MustExec("create table t (a int, b int, c int);")
tk.MustExec("insert into t values (1, 2, 3);")
cancelHook := newCancelJobHook(store, dom, func(job *model.Job) bool {
// Cancel the job when index 't2' is in write-reorg.
return job.MultiSchemaInfo.SubJobs[2].SchemaState == model.StateWriteReorganization
})
dom.DDL().SetHook(cancelHook)
tk.MustGetErrCode("alter table t "+
"add index t(a, b), add index t1(a), "+
"add index t2(a), add index t3(a, b);", errno.ErrCancelledDDLJob)
dom.DDL().SetHook(originHook)
cancelHook.MustCancelDone(t)
tk.MustQuery("show index from t;").Check(testkit.Rows( /* no index */ ))
tk.MustQuery("select * from t;").Check(testkit.Rows("1 2 3"))
tk.MustExec("admin check table t;")

// Test cancel failed when some sub-jobs have been finished.
tk.MustExec("drop table if exists t;")
tk.MustExec("create table t (a int, b int, c int);")
tk.MustExec("insert into t values (1, 2, 3);")
cancelHook = newCancelJobHook(store, dom, func(job *model.Job) bool {
// Cancel the job when index 't1' is in public.
return job.MultiSchemaInfo.SubJobs[1].SchemaState == model.StatePublic
})
dom.DDL().SetHook(cancelHook)
tk.MustExec("alter table t add index t(a, b), add index t1(a), " +
"add index t2(a), add index t3(a, b);")
dom.DDL().SetHook(originHook)
cancelHook.MustCancelFailed(t)
tk.MustQuery("select * from t use index(t, t1, t2, t3);").Check(testkit.Rows("1 2 3"))
tk.MustExec("admin check table t;")
}

func TestMultiSchemaChangeDropIndexes(t *testing.T) {
store, clean := testkit.CreateMockStore(t)
defer clean()
Expand Down Expand Up @@ -372,6 +450,33 @@ func TestMultiSchemaChangeDropIndexesParallel(t *testing.T) {
})
}

func TestMultiSchemaChangeAddDropIndexes(t *testing.T) {
store, clean := testkit.CreateMockStore(t)
defer clean()
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")

// Test add and drop same index.
tk.MustExec("drop table if exists t")
tk.MustExec("create table t (a int, b int, c int, index t(a))")
tk.MustGetErrCode("alter table t drop index t, add index t(b)", errno.ErrDupKeyName)

// Test add and drop same index.
tk.MustExec("drop table if exists t")
tk.MustExec("create table t (a int, b int, c int, index t(a))")
tk.MustGetErrCode("alter table t add index t1(b), drop index t1", errno.ErrCantDropFieldOrKey)

tk.MustExec("drop table if exists t;")
tk.MustExec("create table t (a int, b int, c int, index (a), index(b), index(c));")
tk.MustExec("insert into t values (1, 2, 3);")
tk.MustExec("alter table t add index aa(a), drop index a, add index cc(c), drop index b, drop index c, add index bb(b);")
tk.MustQuery("select * from t use index(aa, bb, cc);").Check(testkit.Rows("1 2 3"))
tk.MustGetErrCode("select * from t use index(a);", errno.ErrKeyDoesNotExist)
tk.MustGetErrCode("select * from t use index(b);", errno.ErrKeyDoesNotExist)
tk.MustGetErrCode("select * from t use index(c);", errno.ErrKeyDoesNotExist)
tk.MustExec("admin check table t;")
}

type cancelOnceHook struct {
store kv.Storage
triggered bool
Expand Down
24 changes: 18 additions & 6 deletions ddl/rollingback.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,9 @@ func convertAddIdxJob2RollbackJob(d *ddlCtx, t *meta.Meta, job *model.Job, tblIn
return ver, errors.Trace(err)
}

// convertNotStartAddIdxJob2RollbackJob converts the add index job that are not started workers to rollingbackJob,
// convertNotReorgAddIdxJob2RollbackJob converts the add index job that are not started workers to rollingbackJob,
// to rollback add index operations. job.SnapshotVer == 0 indicates the workers are not started.
func convertNotStartAddIdxJob2RollbackJob(d *ddlCtx, t *meta.Meta, job *model.Job, occuredErr error) (ver int64, err error) {
func convertNotReorgAddIdxJob2RollbackJob(d *ddlCtx, t *meta.Meta, job *model.Job, occuredErr error) (ver int64, err error) {
xiongjiwei marked this conversation as resolved.
Show resolved Hide resolved
schemaID := job.SchemaID
tblInfo, err := GetTableInfoAndCancelFaultJob(t, job, schemaID)
if err != nil {
Expand Down Expand Up @@ -228,19 +228,31 @@ func rollingbackDropIndex(t *meta.Meta, job *model.Job) (ver int64, err error) {
}

func rollingbackAddIndex(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job, isPK bool) (ver int64, err error) {
// If the value of SnapshotVer isn't zero, it means the work is backfilling the indexes.
if job.SchemaState == model.StateWriteReorganization && job.SnapshotVer != 0 {
if needNotifyAndStopReorgWorker(job) {
// add index workers are started. need to ask them to exit.
logutil.Logger(w.logCtx).Info("[ddl] run the cancelling DDL job", zap.String("job", job.String()))
d.notifyReorgCancel(job)
ver, err = w.onCreateIndex(d, t, job, isPK)
} else {
// add index workers are not started, remove the indexInfo in tableInfo.
ver, err = convertNotStartAddIdxJob2RollbackJob(d, t, job, dbterror.ErrCancelledDDLJob)
// add index's reorg workers are not running, remove the indexInfo in tableInfo.
ver, err = convertNotReorgAddIdxJob2RollbackJob(d, t, job, dbterror.ErrCancelledDDLJob)
}
return
}

func needNotifyAndStopReorgWorker(job *model.Job) bool {
if job.SchemaState == model.StateWriteReorganization && job.SnapshotVer != 0 {
// If the value of SnapshotVer isn't zero, it means the worker is backfilling the indexes.
if job.MultiSchemaInfo != nil {
// However, if the sub-job is non-revertible, it means the reorg process is finished.
// We don't need to start another round to notify reorg workers to exit.
return job.MultiSchemaInfo.Revertible
}
return true
}
return false
}

func convertAddTablePartitionJob2RollbackJob(d *ddlCtx, t *meta.Meta, job *model.Job, otherwiseErr error, tblInfo *model.TableInfo) (ver int64, err error) {
addingDefinitions := tblInfo.Partition.AddingDefinitions
partNames := make([]string, 0, len(addingDefinitions))
Expand Down
Loading