From 00654a397c7298a99989d6c5c22106257a238eb4 Mon Sep 17 00:00:00 2001 From: Hangjie Mo Date: Tue, 21 Feb 2023 14:59:17 +0800 Subject: [PATCH 1/3] support add indexes in multi-schema change --- ddl/delete_range.go | 58 +++++++++++++++---------- ddl/multi_schema_change.go | 40 +++++++++++++++++ ddl/multi_schema_change_test.go | 10 ++--- ddl/rollingback.go | 77 +++++++++++++++++++-------------- 4 files changed, 125 insertions(+), 60 deletions(-) diff --git a/ddl/delete_range.go b/ddl/delete_range.go index 8734d2c482968..2413208b88ab5 100644 --- a/ddl/delete_range.go +++ b/ddl/delete_range.go @@ -326,24 +326,28 @@ func insertJobIntoDeleteRangeTable(ctx context.Context, sctx sessionctx.Context, } // ActionAddIndex, ActionAddPrimaryKey needs do it, because it needs to be rolled back when it's canceled. case model.ActionAddIndex, model.ActionAddPrimaryKey: - var indexID int64 - var ifExists bool + tableID := job.TableID + indexID := make([]int64, 1) + ifExists := make([]bool, 1) var partitionIDs []int64 - if err := job.DecodeArgs(&indexID, &ifExists, &partitionIDs); err != nil { - return errors.Trace(err) + if err := job.DecodeArgs(&indexID[0], &ifExists[0], &partitionIDs); err != nil { + if err = job.DecodeArgs(&indexID, &ifExists, &partitionIDs); err != nil { + return errors.Trace(err) + } } // Determine the physicalIDs to be added. physicalIDs := []int64{job.TableID} if len(partitionIDs) > 0 { physicalIDs = partitionIDs } - // Determine the index IDs to be added. - tempIdxID := tablecodec.TempIndexPrefix | indexID var indexIDs []int64 - if job.State == model.JobStateRollbackDone { - indexIDs = []int64{indexID, tempIdxID} - } else { - indexIDs = []int64{tempIdxID} + for _, idxID := range indexID { + // Determine the index IDs to be added. + tempIdxID := tablecodec.TempIndexPrefix | idxID + if job.State == model.JobStateRollbackDone { + indexIDs = append(indexIDs, idxID) + } + indexIDs = append(indexIDs, tempIdxID) } for _, pid := range physicalIDs { for _, iid := range indexIDs { @@ -358,26 +362,34 @@ func insertJobIntoDeleteRangeTable(ctx context.Context, sctx sessionctx.Context, case model.ActionDropIndex, model.ActionDropPrimaryKey: tableID := job.TableID var indexName interface{} - var ifExists bool - var indexID int64 var partitionIDs []int64 - if err := job.DecodeArgs(&indexName, &ifExists, &indexID, &partitionIDs); err != nil { - return errors.Trace(err) + ifExists := make([]bool, 1) + indexID := make([]int64, 1) + if err := job.DecodeArgs(&indexName, &ifExists[0], &indexID[0], &partitionIDs); err != nil { + if err = job.DecodeArgs(&indexName, &ifExists, &indexID, &partitionIDs); err != nil { + return errors.Trace(err) + } } if len(partitionIDs) > 0 { for _, pid := range partitionIDs { - startKey := tablecodec.EncodeTableIndexPrefix(pid, indexID) - endKey := tablecodec.EncodeTableIndexPrefix(pid, indexID+1) - elemID := ea.allocForIndexID(pid, indexID) - if err := doInsert(ctx, s, job.ID, elemID, startKey, endKey, now, fmt.Sprintf("partition table ID is %d", pid)); err != nil { - return errors.Trace(err) + for _, idxID := range indexID { + startKey := tablecodec.EncodeTableIndexPrefix(pid, idxID) + endKey := tablecodec.EncodeTableIndexPrefix(pid, idxID+1) + elemID := ea.allocForIndexID(pid, idxID) + if err := doInsert(ctx, s, job.ID, elemID, startKey, endKey, now, fmt.Sprintf("partition table ID is %d", pid)); err != nil { + return errors.Trace(err) + } } } } else { - startKey := tablecodec.EncodeTableIndexPrefix(tableID, indexID) - endKey := tablecodec.EncodeTableIndexPrefix(tableID, indexID+1) - elemID := ea.allocForIndexID(tableID, indexID) - return doInsert(ctx, s, job.ID, elemID, startKey, endKey, now, fmt.Sprintf("index ID is %d", indexID)) + for _, idxID := range indexID { + startKey := tablecodec.EncodeTableIndexPrefix(tableID, idxID) + endKey := tablecodec.EncodeTableIndexPrefix(tableID, idxID+1) + elemID := ea.allocForIndexID(tableID, idxID) + if err := doInsert(ctx, s, job.ID, elemID, startKey, endKey, now, fmt.Sprintf("index ID is %d", idxID)); err != nil { + return errors.Trace(err) + } + } } case model.ActionDropColumn: var colName model.CIStr diff --git a/ddl/multi_schema_change.go b/ddl/multi_schema_change.go index 19c355ebfa8da..4ea85f94b9f81 100644 --- a/ddl/multi_schema_change.go +++ b/ddl/multi_schema_change.go @@ -55,6 +55,7 @@ func (d *ddl) MultiSchemaChange(ctx sessionctx.Context, ti ast.Ident) error { if err != nil { return errors.Trace(err) } + mergeAddIndex(ctx.GetSessionVars().StmtCtx.MultiSchemaInfo) ctx.GetSessionVars().StmtCtx.MultiSchemaInfo = nil err = d.DoDDLJob(ctx, job) return d.callHookOnChanged(job, err) @@ -357,6 +358,45 @@ func checkOperateDropIndexUseByForeignKey(info *model.MultiSchemaInfo, t table.T return nil } +func mergeAddIndex(info *model.MultiSchemaInfo) { + var newSubJob *model.SubJob + var unique []bool + var indexNames []model.CIStr + var indexPartSpecifications [][]*ast.IndexPartSpecification + var indexOption []*ast.IndexOption + var hiddenCols [][]*model.ColumnInfo + var global []bool + + newSubJobs := make([]*model.SubJob, 0, len(info.SubJobs)) + for _, subJob := range info.SubJobs { + if subJob.Type == model.ActionAddIndex { + if newSubJob == nil { + newSubJob = new(model.SubJob) + newSubJob.Type = model.ActionAddIndex + newSubJob.Args = nil + newSubJob.RawArgs = nil + newSubJob.SchemaState = subJob.SchemaState + newSubJob.SnapshotVer = subJob.SnapshotVer + newSubJob.Revertible = true + newSubJob.CtxVars = subJob.CtxVars + } + unique = append(unique, subJob.Args[0].(bool)) + indexNames = append(indexNames, subJob.Args[1].(model.CIStr)) + indexPartSpecifications = append(indexPartSpecifications, subJob.Args[2].([]*ast.IndexPartSpecification)) + indexOption = append(indexOption, subJob.Args[3].(*ast.IndexOption)) + hiddenCols = append(hiddenCols, subJob.Args[4].([]*model.ColumnInfo)) + global = append(global, subJob.Args[5].(bool)) + } else { + newSubJobs = append(newSubJobs, subJob) + } + } + if newSubJob != nil { + newSubJob.Args = []interface{}{unique, indexNames, indexPartSpecifications, indexOption, hiddenCols, global} + newSubJobs = append(newSubJobs, newSubJob) + info.SubJobs = newSubJobs + } +} + func checkMultiSchemaInfo(info *model.MultiSchemaInfo, t table.Table) error { err := checkOperateSameColAndIdx(info) if err != nil { diff --git a/ddl/multi_schema_change_test.go b/ddl/multi_schema_change_test.go index 1f6a52bcce244..8c8d797f77519 100644 --- a/ddl/multi_schema_change_test.go +++ b/ddl/multi_schema_change_test.go @@ -576,12 +576,12 @@ func TestMultiSchemaChangeAddIndexesCancelled(t *testing.T) { tk.MustExec("create table t (a int, b int, c int);") tk.MustExec("insert into t values (1, 2, 3);") cancelHook := newCancelJobHook(t, store, dom, func(job *model.Job) bool { - // Cancel the job when index 't2' is in write-reorg. + // Cancel the job when index is in write-reorg. if job.Type != model.ActionMultiSchemaChange { return false } - assertMultiSchema(t, job, 4) - return job.MultiSchemaInfo.SubJobs[2].SchemaState == model.StateWriteReorganization + assertMultiSchema(t, job, 1) + return job.MultiSchemaInfo.SubJobs[0].SchemaState == model.StateWriteReorganization }) dom.DDL().SetHook(cancelHook) tk.MustGetErrCode("alter table t "+ @@ -602,8 +602,8 @@ func TestMultiSchemaChangeAddIndexesCancelled(t *testing.T) { if job.Type != model.ActionMultiSchemaChange { return false } - assertMultiSchema(t, job, 4) - return job.MultiSchemaInfo.SubJobs[1].SchemaState == model.StatePublic + assertMultiSchema(t, job, 1) + return job.MultiSchemaInfo.SubJobs[0].SchemaState == model.StatePublic }) dom.DDL().SetHook(cancelHook) tk.MustExec("alter table t add index t(a, b), add index t1(a), " + diff --git a/ddl/rollingback.go b/ddl/rollingback.go index c6f75442479b6..c01ed99a713b8 100644 --- a/ddl/rollingback.go +++ b/ddl/rollingback.go @@ -44,33 +44,38 @@ func UpdateColsNull2NotNull(tblInfo *model.TableInfo, indexInfo *model.IndexInfo return nil } -func convertAddIdxJob2RollbackJob(d *ddlCtx, t *meta.Meta, job *model.Job, tblInfo *model.TableInfo, indexInfo *model.IndexInfo, err error) (int64, error) { +func convertAddIdxJob2RollbackJob(d *ddlCtx, t *meta.Meta, job *model.Job, tblInfo *model.TableInfo, indexInfo []*model.IndexInfo, err error) (int64, error) { failpoint.Inject("mockConvertAddIdxJob2RollbackJobError", func(val failpoint.Value) { if val.(bool) { failpoint.Return(0, errors.New("mock convert add index job to rollback job error")) } }) - if indexInfo.Primary { - nullCols, err := getNullColInfos(tblInfo, indexInfo) - if err != nil { - return 0, errors.Trace(err) - } - for _, col := range nullCols { - // Field PreventNullInsertFlag flag reset. - col.DelFlag(mysql.PreventNullInsertFlag) + originalState := indexInfo[0].State + idxName := make([]model.CIStr, 0, len(indexInfo)) + ifExists := make([]bool, 0, len(indexInfo)) + for _, idx := range indexInfo { + if idx.Primary { + nullCols, err := getNullColInfos(tblInfo, idx) + if err != nil { + return 0, errors.Trace(err) + } + for _, col := range nullCols { + // Field PreventNullInsertFlag flag reset. + col.DelFlag(mysql.PreventNullInsertFlag) + } } - } - - // the second and the third args will be used in onDropIndex. - job.Args = []interface{}{indexInfo.Name, false /* ifExists */, getPartitionIDs(tblInfo)} - // If add index job rollbacks in write reorganization state, its need to delete all keys which has been added. - // Its work is the same as drop index job do. - // The write reorganization state in add index job that likes write only state in drop index job. - // So the next state is delete only state. - originalState := indexInfo.State - indexInfo.State = model.StateDeleteOnly + // If add index job rollbacks in write reorganization state, its need to delete all keys which has been added. + // Its work is the same as drop index job do. + // The write reorganization state in add index job that likes write only state in drop index job. + // So the next state is delete only state. + idx.State = model.StateDeleteOnly + idxName = append(idxName, idx.Name) + ifExists = append(ifExists, false) + } + // the second args will be used in onDropIndex. + job.Args = []interface{}{idxName, ifExists, getPartitionIDs(tblInfo)} job.SchemaState = model.StateDeleteOnly - ver, err1 := updateVersionAndTableInfo(d, t, job, tblInfo, originalState != indexInfo.State) + ver, err1 := updateVersionAndTableInfo(d, t, job, tblInfo, originalState != indexInfo[0].State) if err1 != nil { return ver, errors.Trace(err1) } @@ -87,24 +92,32 @@ func convertNotReorgAddIdxJob2RollbackJob(d *ddlCtx, t *meta.Meta, job *model.Jo return ver, errors.Trace(err) } - var ( - unique bool - indexName model.CIStr - indexPartSpecifications []*ast.IndexPartSpecification - indexOption *ast.IndexOption - ) - err = job.DecodeArgs(&unique, &indexName, &indexPartSpecifications, &indexOption) + unique := make([]bool, 1) + indexName := make([]model.CIStr, 1) + indexPartSpecifications := make([][]*ast.IndexPartSpecification, 1) + indexOption := make([]*ast.IndexOption, 1) + + err = job.DecodeArgs(&unique[0], &indexName[0], &indexPartSpecifications[0], &indexOption[0]) + if err != nil { + err = job.DecodeArgs(&unique, &indexName, &indexPartSpecifications, &indexOption) + } if err != nil { job.State = model.JobStateCancelled return ver, errors.Trace(err) } - indexInfo := tblInfo.FindIndexByName(indexName.L) - if indexInfo == nil { + var indexesInfo []*model.IndexInfo + for _, idxName := range indexName { + indexInfo := tblInfo.FindIndexByName(idxName.L) + if indexInfo != nil { + indexesInfo = append(indexesInfo, indexInfo) + } + } + if len(indexesInfo) == 0 { job.State = model.JobStateCancelled return ver, dbterror.ErrCancelledDDLJob } - return convertAddIdxJob2RollbackJob(d, t, job, tblInfo, indexInfo, occuredErr) + return convertAddIdxJob2RollbackJob(d, t, job, tblInfo, indexesInfo, occuredErr) } // rollingbackModifyColumn change the modifying-column job into rolling back state. @@ -213,7 +226,7 @@ func rollingbackDropIndex(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, e return ver, errors.Trace(err) } - switch indexInfo.State { + switch indexInfo[0].State { case model.StateWriteOnly, model.StateDeleteOnly, model.StateDeleteReorganization, model.StateNone: // We can not rollback now, so just continue to drop index. // Normally won't fetch here, because there is check when cancel ddl jobs. see function: isJobRollbackable. @@ -223,7 +236,7 @@ func rollingbackDropIndex(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, e job.State = model.JobStateCancelled return ver, dbterror.ErrCancelledDDLJob default: - return ver, dbterror.ErrInvalidDDLState.GenWithStackByArgs("index", indexInfo.State) + return ver, dbterror.ErrInvalidDDLState.GenWithStackByArgs("index", indexInfo[0].State) } } From 546c3574a3ba9f75c34533210012c15ef0f932bd Mon Sep 17 00:00:00 2001 From: Hangjie Mo Date: Wed, 22 Feb 2023 14:13:46 +0800 Subject: [PATCH 2/3] update --- ddl/delete_range.go | 1 - ddl/index.go | 231 ++++++++++++++++++++----------------- ddl/multi_schema_change.go | 21 +++- 3 files changed, 141 insertions(+), 112 deletions(-) diff --git a/ddl/delete_range.go b/ddl/delete_range.go index 2413208b88ab5..d94f05db346bc 100644 --- a/ddl/delete_range.go +++ b/ddl/delete_range.go @@ -326,7 +326,6 @@ func insertJobIntoDeleteRangeTable(ctx context.Context, sctx sessionctx.Context, } // ActionAddIndex, ActionAddPrimaryKey needs do it, because it needs to be rolled back when it's canceled. case model.ActionAddIndex, model.ActionAddPrimaryKey: - tableID := job.TableID indexID := make([]int64, 1) ifExists := make([]bool, 1) var partitionIDs []int64 diff --git a/ddl/index.go b/ddl/index.go index 736fce54bcfcf..d6c191eeda802 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -468,7 +468,8 @@ func checkPrimaryKeyNotNull(d *ddlCtx, w *worker, t *meta.Meta, job *model.Job, if err == nil { return nil, nil } - _, err = convertAddIdxJob2RollbackJob(d, t, job, tblInfo, indexInfo, err) + // addIndexes not support primary key, so only rollback primary is enough + _, err = convertAddIdxJob2RollbackJob(d, t, job, tblInfo, []*model.IndexInfo{indexInfo}, err) // TODO: Support non-strict mode. // warnings = append(warnings, ErrWarnDataTruncated.GenWithStackByArgs(oldCol.Name.L, 0).Error()) return nil, err @@ -523,132 +524,144 @@ func (w *worker) onCreateIndex(d *ddlCtx, t *meta.Meta, job *model.Job, isPK boo return ver, errors.Trace(dbterror.ErrOptOnCacheTable.GenWithStackByArgs("Create Index")) } - var ( - unique bool - global bool - indexName model.CIStr - indexPartSpecifications []*ast.IndexPartSpecification - indexOption *ast.IndexOption - sqlMode mysql.SQLMode - warnings []string - hiddenCols []*model.ColumnInfo - ) + unique := make([]bool, 1) + global := make([]bool, 1) + indexName := make([]model.CIStr, 1) + indexPartSpecifications := make([][]*ast.IndexPartSpecification, 1) + indexOption := make([]*ast.IndexOption, 1) + var sqlMode mysql.SQLMode + var warnings []string + hiddenCols := make([][]*model.ColumnInfo, 1) + isPKSlice := make([]bool, 1) + if isPK { // Notice: sqlMode and warnings is used to support non-strict mode. - err = job.DecodeArgs(&unique, &indexName, &indexPartSpecifications, &indexOption, &sqlMode, &warnings, &global) + err = job.DecodeArgs(&unique[0], &indexName[0], &indexPartSpecifications[0], &indexOption[0], &sqlMode, &warnings, &global[0]) + isPKSlice[0] = true } else { - err = job.DecodeArgs(&unique, &indexName, &indexPartSpecifications, &indexOption, &hiddenCols, &global) + err = job.DecodeArgs(&unique[0], &indexName[0], &indexPartSpecifications[0], &indexOption[0], &hiddenCols[0], &global[0]) + isPKSlice[0] = false } if err != nil { - job.State = model.JobStateCancelled - return ver, errors.Trace(err) - } - - indexInfo := tblInfo.FindIndexByName(indexName.L) - if indexInfo != nil && indexInfo.State == model.StatePublic { - job.State = model.JobStateCancelled - err = dbterror.ErrDupKeyName.GenWithStack("index already exist %s", indexName) - if isPK { - err = infoschema.ErrMultiplePriKey + if job.DecodeArgs(&unique, &indexName, &indexPartSpecifications, &indexOption, &hiddenCols, &global, &isPKSlice, &sqlMode, &warnings) != nil { + job.State = model.JobStateCancelled + return ver, errors.Trace(err) } - return ver, err } - if indexInfo == nil { - for _, hiddenCol := range hiddenCols { - columnInfo := model.FindColumnInfo(tblInfo.Columns, hiddenCol.Name.L) - if columnInfo != nil && columnInfo.State == model.StatePublic { - // We already have a column with the same column name. - job.State = model.JobStateCancelled - // TODO: refine the error message - return ver, infoschema.ErrColumnExists.GenWithStackByArgs(hiddenCol.Name) + var indexesInfo []*model.IndexInfo + for i, idxName := range indexName { + indexInfo := tblInfo.FindIndexByName(idxName.L) + if indexInfo != nil && indexInfo.State == model.StatePublic { + job.State = model.JobStateCancelled + err = dbterror.ErrDupKeyName.GenWithStack("index already exist %s", indexName) + if isPK { + err = infoschema.ErrMultiplePriKey } + return ver, err } - } - if indexInfo == nil { - if len(hiddenCols) > 0 { - for _, hiddenCol := range hiddenCols { - InitAndAddColumnToTable(tblInfo, hiddenCol) + if indexInfo == nil { + // TODO fix it later + for _, hiddenCol := range hiddenCols[i] { + columnInfo := model.FindColumnInfo(tblInfo.Columns, hiddenCol.Name.L) + if columnInfo != nil && columnInfo.State == model.StatePublic { + // We already have a column with the same column name. + job.State = model.JobStateCancelled + // TODO: refine the error message + return ver, infoschema.ErrColumnExists.GenWithStackByArgs(hiddenCol.Name) + } } } - if err = checkAddColumnTooManyColumns(len(tblInfo.Columns)); err != nil { - job.State = model.JobStateCancelled - return ver, errors.Trace(err) - } - indexInfo, err = BuildIndexInfo( - nil, - tblInfo.Columns, - indexName, - isPK, - unique, - global, - indexPartSpecifications, - indexOption, - model.StateNone, - ) - if err != nil { - job.State = model.JobStateCancelled - return ver, errors.Trace(err) - } - if isPK { - if _, err = CheckPKOnGeneratedColumn(tblInfo, indexPartSpecifications); err != nil { + + if indexInfo == nil { + if len(hiddenCols[i]) > 0 { + for _, hiddenCol := range hiddenCols { + InitAndAddColumnToTable(tblInfo, hiddenCol[i]) + } + } + if err = checkAddColumnTooManyColumns(len(tblInfo.Columns)); err != nil { + job.State = model.JobStateCancelled + return ver, errors.Trace(err) + } + indexInfo, err = BuildIndexInfo( + nil, + tblInfo.Columns, + idxName, + isPKSlice[i], + unique[i], + global[i], + indexPartSpecifications[i], + indexOption[i], + model.StateNone, + ) + if err != nil { + job.State = model.JobStateCancelled + return ver, errors.Trace(err) + } + if isPKSlice[i] { + if _, err = CheckPKOnGeneratedColumn(tblInfo, indexPartSpecifications[i]); err != nil { + job.State = model.JobStateCancelled + return ver, err + } + } + indexInfo.ID = AllocateIndexID(tblInfo) + tblInfo.Indices = append(tblInfo.Indices, indexInfo) + if err = checkTooManyIndexes(tblInfo.Indices); err != nil { + job.State = model.JobStateCancelled + return ver, errors.Trace(err) + } + // Here we need do this check before set state to `DeleteOnly`, + // because if hidden columns has been set to `DeleteOnly`, + // the `DeleteOnly` columns are missing when we do this check. + if err := checkInvisibleIndexOnPK(tblInfo); err != nil { job.State = model.JobStateCancelled return ver, err } + logutil.BgLogger().Info("[ddl] run add index job", zap.String("job", job.String()), zap.Reflect("indexInfo", indexInfo)) } - indexInfo.ID = AllocateIndexID(tblInfo) - tblInfo.Indices = append(tblInfo.Indices, indexInfo) - if err = checkTooManyIndexes(tblInfo.Indices); err != nil { - job.State = model.JobStateCancelled - return ver, errors.Trace(err) - } - // Here we need do this check before set state to `DeleteOnly`, - // because if hidden columns has been set to `DeleteOnly`, - // the `DeleteOnly` columns are missing when we do this check. - if err := checkInvisibleIndexOnPK(tblInfo); err != nil { - job.State = model.JobStateCancelled - return ver, err - } - logutil.BgLogger().Info("[ddl] run add index job", zap.String("job", job.String()), zap.Reflect("indexInfo", indexInfo)) } - originalState := indexInfo.State - switch indexInfo.State { + originalState := indexesInfo[0].State + switch indexesInfo[0].State { case model.StateNone: // none -> delete only reorgTp := pickBackfillType(w, job) - if reorgTp.NeedMergeProcess() { - // Increase telemetryAddIndexIngestUsage - telemetryAddIndexIngestUsage.Inc() - indexInfo.BackfillState = model.BackfillStateRunning + for _, indexInfo := range indexesInfo { + if reorgTp.NeedMergeProcess() { + indexInfo.BackfillState = model.BackfillStateRunning + } + indexInfo.State = model.StateDeleteOnly + moveAndUpdateHiddenColumnsToPublic(tblInfo, indexInfo) } - indexInfo.State = model.StateDeleteOnly - moveAndUpdateHiddenColumnsToPublic(tblInfo, indexInfo) - ver, err = updateVersionAndTableInfoWithCheck(d, t, job, tblInfo, originalState != indexInfo.State) + ver, err = updateVersionAndTableInfoWithCheck(d, t, job, tblInfo, originalState != indexesInfo[0].State) if err != nil { return ver, err } job.SchemaState = model.StateDeleteOnly case model.StateDeleteOnly: // delete only -> write only - indexInfo.State = model.StateWriteOnly - _, err = checkPrimaryKeyNotNull(d, w, t, job, tblInfo, indexInfo) - if err != nil { - break + for _, indexInfo := range indexesInfo { + indexInfo.State = model.StateWriteOnly + _, err = checkPrimaryKeyNotNull(d, w, t, job, tblInfo, indexInfo) + if err != nil { + break + } } - ver, err = updateVersionAndTableInfo(d, t, job, tblInfo, originalState != indexInfo.State) + ver, err = updateVersionAndTableInfo(d, t, job, tblInfo, originalState != indexesInfo[0].State) if err != nil { return ver, err } job.SchemaState = model.StateWriteOnly case model.StateWriteOnly: // write only -> reorganization - indexInfo.State = model.StateWriteReorganization - _, err = checkPrimaryKeyNotNull(d, w, t, job, tblInfo, indexInfo) - if err != nil { - break + for _, indexInfo := range indexesInfo { + indexInfo.State = model.StateWriteReorganization + _, err = checkPrimaryKeyNotNull(d, w, t, job, tblInfo, indexInfo) + if err != nil { + break + } } - ver, err = updateVersionAndTableInfo(d, t, job, tblInfo, originalState != indexInfo.State) + ver, err = updateVersionAndTableInfo(d, t, job, tblInfo, originalState != indexesInfo[0].State) if err != nil { return ver, err } @@ -668,31 +681,34 @@ func (w *worker) onCreateIndex(d *ddlCtx, t *meta.Meta, job *model.Job, isPK boo var done bool if job.MultiSchemaInfo != nil { - done, ver, err = doReorgWorkForCreateIndexMultiSchema(w, d, t, job, tbl, indexInfo) + done, ver, err = doReorgWorkForCreateIndexMultiSchema(w, d, t, job, tbl, indexesInfo) } else { if job.ReorgMeta.IsDistReorg { - done, ver, err = doReorgWorkForCreateIndexWithDistReorg(w, d, t, job, tbl, indexInfo) + done, ver, err = doReorgWorkForCreateIndexWithDistReorg(w, d, t, job, tbl, indexesInfo) } else { - done, ver, err = doReorgWorkForCreateIndex(w, d, t, job, tbl, indexInfo) + done, ver, err = doReorgWorkForCreateIndex(w, d, t, job, tbl, indexesInfo) } } if !done { return ver, err } - // Set column index flag. - AddIndexColumnFlag(tblInfo, indexInfo) - if isPK { - if err = UpdateColsNull2NotNull(tblInfo, indexInfo); err != nil { - return ver, errors.Trace(err) + indexIDs := make([]int64, 0, len(indexesInfo)) + for _, indexInfo := range indexesInfo { + // Set column index flag. + AddIndexColumnFlag(tblInfo, indexInfo) + if isPK { + if err = UpdateColsNull2NotNull(tblInfo, indexInfo); err != nil { + return ver, errors.Trace(err) + } } + indexInfo.State = model.StatePublic } - indexInfo.State = model.StatePublic - ver, err = updateVersionAndTableInfo(d, t, job, tblInfo, originalState != indexInfo.State) + ver, err = updateVersionAndTableInfo(d, t, job, tblInfo, originalState != indexesInfo[0].State) if err != nil { return ver, errors.Trace(err) } - job.Args = []interface{}{indexInfo.ID, false /*if exists*/, getPartitionIDs(tbl.Meta())} + job.Args = []interface{}{indexIDs, false /*if exists*/, getPartitionIDs(tbl.Meta())} // Finish this job. job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo) if job.ReorgMeta.ReorgTp == model.ReorgTypeLitMerge { @@ -793,7 +809,7 @@ func tryFallbackToTxnMerge(job *model.Job, err error) error { } func doReorgWorkForCreateIndexMultiSchema(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job, - tbl table.Table, indexInfo *model.IndexInfo) (done bool, ver int64, err error) { + tbl table.Table, indexInfo []*model.IndexInfo) (done bool, ver int64, err error) { if job.MultiSchemaInfo.Revertible { done, ver, err = doReorgWorkForCreateIndex(w, d, t, job, tbl, indexInfo) if done { @@ -809,17 +825,18 @@ func doReorgWorkForCreateIndexMultiSchema(w *worker, d *ddlCtx, t *meta.Meta, jo } func doReorgWorkForCreateIndex(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job, - tbl table.Table, indexInfo *model.IndexInfo) (done bool, ver int64, err error) { + tbl table.Table, indexesInfo []*model.IndexInfo) (done bool, ver int64, err error) { bfProcess := pickBackfillType(w, job) if !bfProcess.NeedMergeProcess() { - return runReorgJobAndHandleErr(w, d, t, job, tbl, indexInfo, false) + return runReorgJobAndHandleErr(w, d, t, job, tbl, indexesInfo, false) } + indexInfo := indexesInfo[0] switch indexInfo.BackfillState { case model.BackfillStateRunning: logutil.BgLogger().Info("[ddl] index backfill state running", zap.Int64("job ID", job.ID), zap.String("table", tbl.Meta().Name.O), zap.Bool("ingest mode", bfProcess == model.ReorgTypeLitMerge), - zap.String("index", indexInfo.Name.O)) + zap.String("indexes", indexInfo.Name.O)) switch bfProcess { case model.ReorgTypeLitMerge: bc, ok := ingest.LitBackCtxMgr.Load(job.ID) @@ -853,7 +870,7 @@ func doReorgWorkForCreateIndex(w *worker, d *ddlCtx, t *meta.Meta, job *model.Jo if common.ErrFoundDuplicateKeys.Equal(err) { err = convertToKeyExistsErr(err, indexInfo, tbl.Meta()) } - ver, err = convertAddIdxJob2RollbackJob(d, t, job, tbl.Meta(), indexInfo, err) + ver, err = convertAddIdxJob2RollbackJob(d, t, job, tbl.Meta(), []*model.IndexInfo{indexInfo}, err) } else { logutil.BgLogger().Warn("[ddl] lightning import error", zap.Error(err)) err = tryFallbackToTxnMerge(job, err) diff --git a/ddl/multi_schema_change.go b/ddl/multi_schema_change.go index 4ea85f94b9f81..a06fedd6a513d 100644 --- a/ddl/multi_schema_change.go +++ b/ddl/multi_schema_change.go @@ -20,6 +20,7 @@ import ( "github.com/pingcap/tidb/meta" "github.com/pingcap/tidb/parser/ast" "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/parser/terror" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/table" @@ -366,10 +367,13 @@ func mergeAddIndex(info *model.MultiSchemaInfo) { var indexOption []*ast.IndexOption var hiddenCols [][]*model.ColumnInfo var global []bool + var isPK []bool + var sqlMode mysql.SQLMode + var warnings []string newSubJobs := make([]*model.SubJob, 0, len(info.SubJobs)) for _, subJob := range info.SubJobs { - if subJob.Type == model.ActionAddIndex { + if subJob.Type == model.ActionAddIndex || subJob.Type == model.ActionAddPrimaryKey { if newSubJob == nil { newSubJob = new(model.SubJob) newSubJob.Type = model.ActionAddIndex @@ -384,14 +388,23 @@ func mergeAddIndex(info *model.MultiSchemaInfo) { indexNames = append(indexNames, subJob.Args[1].(model.CIStr)) indexPartSpecifications = append(indexPartSpecifications, subJob.Args[2].([]*ast.IndexPartSpecification)) indexOption = append(indexOption, subJob.Args[3].(*ast.IndexOption)) - hiddenCols = append(hiddenCols, subJob.Args[4].([]*model.ColumnInfo)) - global = append(global, subJob.Args[5].(bool)) + if subJob.Type == model.ActionAddIndex { + hiddenCols = append(hiddenCols, subJob.Args[4].([]*model.ColumnInfo)) + global = append(global, subJob.Args[5].(bool)) + isPK = append(isPK, false) + } else if subJob.Type == model.ActionAddPrimaryKey { + hiddenCols = append(hiddenCols, nil) + sqlMode = subJob.Args[4].(mysql.SQLMode) + warnings = subJob.Args[5].([]string) + global = append(global, subJob.Args[6].(bool)) + isPK = append(isPK, true) + } } else { newSubJobs = append(newSubJobs, subJob) } } if newSubJob != nil { - newSubJob.Args = []interface{}{unique, indexNames, indexPartSpecifications, indexOption, hiddenCols, global} + newSubJob.Args = []interface{}{unique, indexNames, indexPartSpecifications, indexOption, hiddenCols, global, isPK, sqlMode, warnings} newSubJobs = append(newSubJobs, newSubJob) info.SubJobs = newSubJobs } From a0c5dc11927ea134deda092d4d583c35d044a511 Mon Sep 17 00:00:00 2001 From: Hangjie Mo Date: Thu, 23 Feb 2023 21:22:24 +0800 Subject: [PATCH 3/3] update --- ddl/backfilling.go | 21 ++-- ddl/delete_range.go | 4 +- ddl/index.go | 223 +++++++++++++++++++++++++----------------- ddl/index_cop.go | 37 ++++--- ddl/index_cop_test.go | 2 +- 5 files changed, 172 insertions(+), 115 deletions(-) diff --git a/ddl/backfilling.go b/ddl/backfilling.go index 25881476c71da..5eed1b9a53088 100644 --- a/ddl/backfilling.go +++ b/ddl/backfilling.go @@ -922,8 +922,7 @@ func (b *backfillScheduler) adjustWorkerSize() error { switch b.tp { case typeAddIndexWorker: backfillCtx := newBackfillCtx(reorgInfo.d, i, sessCtx, reorgInfo.ReorgMeta.ReorgTp, job.SchemaName, b.tbl, false) - idxWorker, err := newAddIndexWorker(b.decodeColMap, b.tbl, backfillCtx, - jc, job.ID, reorgInfo.currElement.ID, reorgInfo.currElement.TypeKey) + idxWorker, err := newAddIndexWorker(b.decodeColMap, b.tbl, backfillCtx, jc, job.ID, reorgInfo.elements) if err != nil { if canSkipError(b.reorgInfo.ID, len(b.workers), err) { continue @@ -980,18 +979,24 @@ func (b *backfillScheduler) initCopReqSenderPool() { b.copReqSenderPool != nil || len(b.workers) > 0 { return } - indexInfo := model.FindIndexInfoByID(b.tbl.Meta().Indices, b.reorgInfo.currElement.ID) - if indexInfo == nil { - logutil.BgLogger().Warn("[ddl-ingest] cannot init cop request sender", - zap.Int64("table ID", b.tbl.Meta().ID), zap.Int64("index ID", b.reorgInfo.currElement.ID)) - return + + indexesInfo := make([]*model.IndexInfo, 0, len(b.reorgInfo.elements)) + + for _, ele := range b.reorgInfo.elements { + indexInfo := model.FindIndexInfoByID(b.tbl.Meta().Indices, ele.ID) + if indexInfo == nil { + logutil.BgLogger().Warn("[ddl-ingest] cannot init cop request sender", + zap.Int64("table ID", b.tbl.Meta().ID), zap.Int64("index ID", b.reorgInfo.currElement.ID)) + return + } + indexesInfo = append(indexesInfo, indexInfo) } sessCtx, err := b.newSessCtx() if err != nil { logutil.BgLogger().Warn("[ddl-ingest] cannot init cop request sender", zap.Error(err)) return } - copCtx, err := newCopContext(b.tbl.Meta(), indexInfo, sessCtx) + copCtx, err := newCopContext(b.tbl.Meta(), indexesInfo, sessCtx) if err != nil { logutil.BgLogger().Warn("[ddl-ingest] cannot init cop request sender", zap.Error(err)) return diff --git a/ddl/delete_range.go b/ddl/delete_range.go index d94f05db346bc..a3d72cc204f80 100644 --- a/ddl/delete_range.go +++ b/ddl/delete_range.go @@ -360,11 +360,11 @@ func insertJobIntoDeleteRangeTable(ctx context.Context, sctx sessionctx.Context, } case model.ActionDropIndex, model.ActionDropPrimaryKey: tableID := job.TableID - var indexName interface{} var partitionIDs []int64 + indexName := make([]interface{}, 1) ifExists := make([]bool, 1) indexID := make([]int64, 1) - if err := job.DecodeArgs(&indexName, &ifExists[0], &indexID[0], &partitionIDs); err != nil { + if err := job.DecodeArgs(&indexName[0], &ifExists[0], &indexID[0], &partitionIDs); err != nil { if err = job.DecodeArgs(&indexName, &ifExists, &indexID, &partitionIDs); err != nil { return errors.Trace(err) } diff --git a/ddl/index.go b/ddl/index.go index d6c191eeda802..cb1cc5a7249bd 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -543,7 +543,7 @@ func (w *worker) onCreateIndex(d *ddlCtx, t *meta.Meta, job *model.Job, isPK boo isPKSlice[0] = false } if err != nil { - if job.DecodeArgs(&unique, &indexName, &indexPartSpecifications, &indexOption, &hiddenCols, &global, &isPKSlice, &sqlMode, &warnings) != nil { + if err = job.DecodeArgs(&unique, &indexName, &indexPartSpecifications, &indexOption, &hiddenCols, &global, &isPKSlice, &sqlMode, &warnings); err != nil { job.State = model.JobStateCancelled return ver, errors.Trace(err) } @@ -620,6 +620,7 @@ func (w *worker) onCreateIndex(d *ddlCtx, t *meta.Meta, job *model.Job, isPK boo } logutil.BgLogger().Info("[ddl] run add index job", zap.String("job", job.String()), zap.Reflect("indexInfo", indexInfo)) } + indexesInfo = append(indexesInfo, indexInfo) } originalState := indexesInfo[0].State switch indexesInfo[0].State { @@ -694,6 +695,7 @@ func (w *worker) onCreateIndex(d *ddlCtx, t *meta.Meta, job *model.Job, isPK boo } indexIDs := make([]int64, 0, len(indexesInfo)) + ifExists := make([]bool, 0, len(indexesInfo)) for _, indexInfo := range indexesInfo { // Set column index flag. AddIndexColumnFlag(tblInfo, indexInfo) @@ -703,12 +705,14 @@ func (w *worker) onCreateIndex(d *ddlCtx, t *meta.Meta, job *model.Job, isPK boo } } indexInfo.State = model.StatePublic + indexIDs = append(indexIDs, indexInfo.ID) + ifExists = append(ifExists, false) } ver, err = updateVersionAndTableInfo(d, t, job, tblInfo, originalState != indexesInfo[0].State) if err != nil { return ver, errors.Trace(err) } - job.Args = []interface{}{indexIDs, false /*if exists*/, getPartitionIDs(tbl.Meta())} + job.Args = []interface{}{indexIDs, ifExists, getPartitionIDs(tbl.Meta())} // Finish this job. job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo) if job.ReorgMeta.ReorgTp == model.ReorgTypeLitMerge { @@ -830,13 +834,12 @@ func doReorgWorkForCreateIndex(w *worker, d *ddlCtx, t *meta.Meta, job *model.Jo if !bfProcess.NeedMergeProcess() { return runReorgJobAndHandleErr(w, d, t, job, tbl, indexesInfo, false) } - indexInfo := indexesInfo[0] - switch indexInfo.BackfillState { + switch indexesInfo[0].BackfillState { case model.BackfillStateRunning: logutil.BgLogger().Info("[ddl] index backfill state running", zap.Int64("job ID", job.ID), zap.String("table", tbl.Meta().Name.O), zap.Bool("ingest mode", bfProcess == model.ReorgTypeLitMerge), - zap.String("indexes", indexInfo.Name.O)) + zap.String("indexes", indexesInfo[0].Name.O)) switch bfProcess { case model.ReorgTypeLitMerge: bc, ok := ingest.LitBackCtxMgr.Load(job.ID) @@ -849,12 +852,13 @@ func doReorgWorkForCreateIndex(w *worker, d *ddlCtx, t *meta.Meta, job *model.Jo job.RowCount = 0 return false, ver, nil } - bc, err = ingest.LitBackCtxMgr.Register(w.ctx, indexInfo.Unique, job.ID, job.ReorgMeta.SQLMode) + // TODO, always use flase for now, need to support transfer bool slices for lwCtx + bc, err = ingest.LitBackCtxMgr.Register(w.ctx, false, job.ID, job.ReorgMeta.SQLMode) if err != nil { err = tryFallbackToTxnMerge(job, err) return false, ver, errors.Trace(err) } - done, ver, err = runReorgJobAndHandleErr(w, d, t, job, tbl, indexInfo, false) + done, ver, err = runReorgJobAndHandleErr(w, d, t, job, tbl, indexesInfo, false) if err != nil { ingest.LitBackCtxMgr.Unregister(job.ID) err = tryFallbackToTxnMerge(job, err) @@ -863,14 +867,17 @@ func doReorgWorkForCreateIndex(w *worker, d *ddlCtx, t *meta.Meta, job *model.Jo if !done { return false, ver, nil } - err = bc.FinishImport(indexInfo.ID, indexInfo.Unique, tbl) + // TODO, always use false for now, need to support transfer bool slices for lwCtx + err = bc.FinishImport(indexesInfo[0].ID, false, tbl) if err != nil { if kv.ErrKeyExists.Equal(err) || common.ErrFoundDuplicateKeys.Equal(err) { logutil.BgLogger().Warn("[ddl] import index duplicate key, convert job to rollback", zap.String("job", job.String()), zap.Error(err)) - if common.ErrFoundDuplicateKeys.Equal(err) { - err = convertToKeyExistsErr(err, indexInfo, tbl.Meta()) - } - ver, err = convertAddIdxJob2RollbackJob(d, t, job, tbl.Meta(), []*model.IndexInfo{indexInfo}, err) + /* + if common.ErrFoundDuplicateKeys.Equal(err) { + err = convertToKeyExistsErr(err, indexInfo, tbl.Meta()) + } + */ + ver, err = convertAddIdxJob2RollbackJob(d, t, job, tbl.Meta(), indexesInfo, err) } else { logutil.BgLogger().Warn("[ddl] lightning import error", zap.Error(err)) err = tryFallbackToTxnMerge(job, err) @@ -880,18 +887,22 @@ func doReorgWorkForCreateIndex(w *worker, d *ddlCtx, t *meta.Meta, job *model.Jo } bc.SetDone() case model.ReorgTypeTxnMerge: - done, ver, err = runReorgJobAndHandleErr(w, d, t, job, tbl, indexInfo, false) + done, ver, err = runReorgJobAndHandleErr(w, d, t, job, tbl, indexesInfo, false) if err != nil || !done { return false, ver, errors.Trace(err) } } - indexInfo.BackfillState = model.BackfillStateReadyToMerge + for _, indexInfo := range indexesInfo { + indexInfo.BackfillState = model.BackfillStateReadyToMerge + } ver, err = updateVersionAndTableInfo(d, t, job, tbl.Meta(), true) return false, ver, errors.Trace(err) case model.BackfillStateReadyToMerge: logutil.BgLogger().Info("[ddl] index backfill state ready to merge", zap.Int64("job ID", job.ID), - zap.String("table", tbl.Meta().Name.O), zap.String("index", indexInfo.Name.O)) - indexInfo.BackfillState = model.BackfillStateMerging + zap.String("table", tbl.Meta().Name.O), zap.String("index", indexesInfo[0].Name.O)) + for _, indexInfo := range indexesInfo { + indexInfo.BackfillState = model.BackfillStateMerging + } if bfProcess == model.ReorgTypeLitMerge { ingest.LitBackCtxMgr.Unregister(job.ID) } @@ -899,29 +910,31 @@ func doReorgWorkForCreateIndex(w *worker, d *ddlCtx, t *meta.Meta, job *model.Jo ver, err = updateVersionAndTableInfo(d, t, job, tbl.Meta(), true) return false, ver, errors.Trace(err) case model.BackfillStateMerging: - done, ver, err = runReorgJobAndHandleErr(w, d, t, job, tbl, indexInfo, true) + done, ver, err = runReorgJobAndHandleErr(w, d, t, job, tbl, indexesInfo, true) if !done { return false, ver, err } - indexInfo.BackfillState = model.BackfillStateInapplicable // Prevent double-write on this index. + for _, indexInfo := range indexesInfo { + indexInfo.BackfillState = model.BackfillStateInapplicable // Prevent double-write on this index. + } return true, ver, err default: - return false, 0, dbterror.ErrInvalidDDLState.GenWithStackByArgs("backfill", indexInfo.BackfillState) + return false, 0, dbterror.ErrInvalidDDLState.GenWithStackByArgs("backfill", indexesInfo[0].BackfillState) } } func doReorgWorkForCreateIndexWithDistReorg(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job, - tbl table.Table, indexInfo *model.IndexInfo) (done bool, ver int64, err error) { + tbl table.Table, indexInfo []*model.IndexInfo) (done bool, ver int64, err error) { bfProcess := pickBackfillType(w, job) if !bfProcess.NeedMergeProcess() { return runReorgJobAndHandleErr(w, d, t, job, tbl, indexInfo, false) } - switch indexInfo.BackfillState { + switch indexInfo[0].BackfillState { case model.BackfillStateRunning: logutil.BgLogger().Info("[ddl] index backfill state running", zap.Int64("job ID", job.ID), zap.String("table", tbl.Meta().Name.O), zap.Bool("ingest mode", bfProcess == model.ReorgTypeLitMerge), - zap.String("index", indexInfo.Name.O)) + zap.String("index", indexInfo[0].Name.O)) switch bfProcess { case model.ReorgTypeLitMerge: done, ver, err = runReorgJobAndHandleErr(w, d, t, job, tbl, indexInfo, false) @@ -938,13 +951,13 @@ func doReorgWorkForCreateIndexWithDistReorg(w *worker, d *ddlCtx, t *meta.Meta, return false, ver, errors.Trace(err) } } - indexInfo.BackfillState = model.BackfillStateReadyToMerge + indexInfo[0].BackfillState = model.BackfillStateReadyToMerge ver, err = updateVersionAndTableInfo(d, t, job, tbl.Meta(), true) return false, ver, errors.Trace(err) case model.BackfillStateReadyToMerge: logutil.BgLogger().Info("[ddl] index backfill state ready to merge", zap.Int64("job ID", job.ID), - zap.String("table", tbl.Meta().Name.O), zap.String("index", indexInfo.Name.O)) - indexInfo.BackfillState = model.BackfillStateMerging + zap.String("table", tbl.Meta().Name.O), zap.String("index", indexInfo[0].Name.O)) + indexInfo[0].BackfillState = model.BackfillStateMerging job.SnapshotVer = 0 // Reset the snapshot version for merge index reorg. ver, err = updateVersionAndTableInfo(d, t, job, tbl.Meta(), true) return false, ver, errors.Trace(err) @@ -953,10 +966,10 @@ func doReorgWorkForCreateIndexWithDistReorg(w *worker, d *ddlCtx, t *meta.Meta, if !done { return false, ver, err } - indexInfo.BackfillState = model.BackfillStateInapplicable // Prevent double-write on this index. + indexInfo[0].BackfillState = model.BackfillStateInapplicable // Prevent double-write on this index. return true, ver, nil default: - return false, 0, dbterror.ErrInvalidDDLState.GenWithStackByArgs("backfill", indexInfo.BackfillState) + return false, 0, dbterror.ErrInvalidDDLState.GenWithStackByArgs("backfill", indexInfo[0].BackfillState) } } @@ -977,12 +990,15 @@ func convertToKeyExistsErr(originErr error, idxInfo *model.IndexInfo, tblInfo *m } func runReorgJobAndHandleErr(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job, - tbl table.Table, indexInfo *model.IndexInfo, mergingTmpIdx bool) (done bool, ver int64, err error) { - elements := []*meta.Element{{ID: indexInfo.ID, TypeKey: meta.IndexElementKey}} + tbl table.Table, indexInfo []*model.IndexInfo, mergingTmpIdx bool) (done bool, ver int64, err error) { + elements := make([]*meta.Element, 0, len(indexInfo)) + for _, idx := range indexInfo { + elements = append(elements, &meta.Element{ID: idx.ID, TypeKey: meta.IndexElementKey}) + } failpoint.Inject("mockDMLExecutionStateMerging", func(val failpoint.Value) { //nolint:forcetypeassert - if val.(bool) && indexInfo.BackfillState == model.BackfillStateMerging && + if val.(bool) && indexInfo[0].BackfillState == model.BackfillStateMerging && MockDMLExecutionStateMerging != nil { MockDMLExecutionStateMerging() } @@ -1008,7 +1024,7 @@ func runReorgJobAndHandleErr(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job, err = w.runReorgJob(rh, reorgInfo, tbl.Meta(), d.lease, func() (addIndexErr error) { defer util.Recover(metrics.LabelDDL, "onCreateIndex", func() { - addIndexErr = dbterror.ErrCancelledDDLJob.GenWithStack("add table `%v` index `%v` panic", tbl.Meta().Name, indexInfo.Name) + addIndexErr = dbterror.ErrCancelledDDLJob.GenWithStack("add table `%v` index `%v` panic", tbl.Meta().Name, indexInfo[0].Name) }, false) return w.addTableIndex(tbl, reorgInfo) }) @@ -1047,40 +1063,54 @@ func onDropIndex(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error) { if job.MultiSchemaInfo != nil && !job.IsRollingback() && job.MultiSchemaInfo.Revertible { job.MarkNonRevertible() - job.SchemaState = indexInfo.State + job.SchemaState = indexInfo[0].State return updateVersionAndTableInfo(d, t, job, tblInfo, false) } - originalState := indexInfo.State - switch indexInfo.State { + originalState := indexInfo[0].State + switch indexInfo[0].State { case model.StatePublic: // public -> write only - indexInfo.State = model.StateWriteOnly - ver, err = updateVersionAndTableInfo(d, t, job, tblInfo, originalState != indexInfo.State) + for _, idxInfo := range indexInfo { + idxInfo.State = model.StateWriteOnly + } + ver, err = updateVersionAndTableInfo(d, t, job, tblInfo, originalState != indexInfo[0].State) if err != nil { return ver, errors.Trace(err) } case model.StateWriteOnly: // write only -> delete only - indexInfo.State = model.StateDeleteOnly - ver, err = updateVersionAndTableInfo(d, t, job, tblInfo, originalState != indexInfo.State) + for _, idxInfo := range indexInfo { + idxInfo.State = model.StateDeleteOnly + } + ver, err = updateVersionAndTableInfo(d, t, job, tblInfo, originalState != indexInfo[0].State) if err != nil { return ver, errors.Trace(err) } case model.StateDeleteOnly: // delete only -> reorganization - indexInfo.State = model.StateDeleteReorganization - ver, err = updateVersionAndTableInfo(d, t, job, tblInfo, originalState != indexInfo.State) + for _, idxInfo := range indexInfo { + idxInfo.State = model.StateDeleteReorganization + } + ver, err = updateVersionAndTableInfo(d, t, job, tblInfo, originalState != indexInfo[0].State) if err != nil { return ver, errors.Trace(err) } case model.StateDeleteReorganization: // reorganization -> absent - indexInfo.State = model.StateNone - // Set column index flag. - DropIndexColumnFlag(tblInfo, indexInfo) - RemoveDependentHiddenColumns(tblInfo, indexInfo) - removeIndexInfo(tblInfo, indexInfo) + idxIds := make([]int64, 0, len(indexInfo)) + idxNames := make([]model.CIStr, 0, len(indexInfo)) + ifExists := make([]bool, 0, len(indexInfo)) + for _, idxInfo := range indexInfo { + idxInfo.State = model.StateNone + // Set column index flag. + DropIndexColumnFlag(tblInfo, idxInfo) + RemoveDependentHiddenColumns(tblInfo, idxInfo) + removeIndexInfo(tblInfo, idxInfo) + idxIds = append(idxIds, idxInfo.ID) + idxNames = append(idxNames, idxInfo.Name) + ifExists = append(ifExists, false) + } failpoint.Inject("mockExceedErrorLimit", func(val failpoint.Value) { //nolint:forcetypeassert @@ -1100,17 +1130,19 @@ func onDropIndex(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error) { if job.ReorgMeta.ReorgTp == model.ReorgTypeLitMerge { ingest.LitBackCtxMgr.Unregister(job.ID) } - job.Args[0] = indexInfo.ID + job.Args[0] = idxIds } else { // the partition ids were append by convertAddIdxJob2RollbackJob, it is weird, but for the compatibility, // we should keep appending the partitions in the convertAddIdxJob2RollbackJob. job.FinishTableJob(model.JobStateDone, model.StateNone, ver, tblInfo) - job.Args = append(job.Args, indexInfo.ID, getPartitionIDs(tblInfo)) + job.Args[0] = idxNames + job.Args[1] = ifExists + job.Args = append(job.Args, idxIds, getPartitionIDs(tblInfo)) } default: - return ver, errors.Trace(dbterror.ErrInvalidDDLState.GenWithStackByArgs("index", indexInfo.State)) + return ver, errors.Trace(dbterror.ErrInvalidDDLState.GenWithStackByArgs("index", indexInfo[0].State)) } - job.SchemaState = indexInfo.State + job.SchemaState = indexInfo[0].State return ver, errors.Trace(err) } @@ -1150,44 +1182,50 @@ func removeIndexInfo(tblInfo *model.TableInfo, idxInfo *model.IndexInfo) { tblInfo.Indices = append(tblInfo.Indices[:offset], tblInfo.Indices[offset+1:]...) } -func checkDropIndex(d *ddlCtx, t *meta.Meta, job *model.Job) (*model.TableInfo, *model.IndexInfo, bool /* ifExists */, error) { +func checkDropIndex(d *ddlCtx, t *meta.Meta, job *model.Job) (*model.TableInfo, []*model.IndexInfo, bool /* ifExists */, error) { schemaID := job.SchemaID tblInfo, err := GetTableInfoAndCancelFaultJob(t, job, schemaID) if err != nil { return nil, nil, false, errors.Trace(err) } - var indexName model.CIStr - var ifExists bool - if err = job.DecodeArgs(&indexName, &ifExists); err != nil { - job.State = model.JobStateCancelled - return nil, nil, false, errors.Trace(err) + indexName := make([]model.CIStr, 1) + ifExists := make([]bool, 1) + if err = job.DecodeArgs(&indexName[0], &ifExists[0]); err != nil { + if err = job.DecodeArgs(&indexName, &ifExists); err != nil { + job.State = model.JobStateCancelled + return nil, nil, false, errors.Trace(err) + } } - indexInfo := tblInfo.FindIndexByName(indexName.L) - if indexInfo == nil { - job.State = model.JobStateCancelled - return nil, nil, ifExists, dbterror.ErrCantDropFieldOrKey.GenWithStack("index %s doesn't exist", indexName) - } + indexesInfo := make([]*model.IndexInfo, 0, len(indexName)) + for i, idxName := range indexName { + indexInfo := tblInfo.FindIndexByName(idxName.L) + if indexInfo == nil { + job.State = model.JobStateCancelled + return nil, nil, ifExists[i], dbterror.ErrCantDropFieldOrKey.GenWithStack("index %s doesn't exist", indexName) + } - // Double check for drop index on auto_increment column. - err = CheckDropIndexOnAutoIncrementColumn(tblInfo, indexInfo) - if err != nil { - job.State = model.JobStateCancelled - return nil, nil, false, autoid.ErrWrongAutoKey - } + // Double check for drop index on auto_increment column. + err = CheckDropIndexOnAutoIncrementColumn(tblInfo, indexInfo) + if err != nil { + job.State = model.JobStateCancelled + return nil, nil, false, autoid.ErrWrongAutoKey + } - // Check that drop primary index will not cause invisible implicit primary index. - if err := checkInvisibleIndexesOnPK(tblInfo, []*model.IndexInfo{indexInfo}, job); err != nil { - job.State = model.JobStateCancelled - return nil, nil, false, errors.Trace(err) - } + // Check that drop primary index will not cause invisible implicit primary index. + if err := checkInvisibleIndexesOnPK(tblInfo, []*model.IndexInfo{indexInfo}, job); err != nil { + job.State = model.JobStateCancelled + return nil, nil, false, errors.Trace(err) + } - // Double check for drop index needed in foreign key. - if err := checkIndexNeededInForeignKeyInOwner(d, t, job, job.SchemaName, tblInfo, indexInfo); err != nil { - return nil, nil, false, errors.Trace(err) + // Double check for drop index needed in foreign key. + if err := checkIndexNeededInForeignKeyInOwner(d, t, job, job.SchemaName, tblInfo, indexInfo); err != nil { + return nil, nil, false, errors.Trace(err) + } + indexesInfo = append(indexesInfo, indexInfo) } - return tblInfo, indexInfo, false, nil + return tblInfo, indexesInfo, false, nil } func checkInvisibleIndexesOnPK(tblInfo *model.TableInfo, indexInfos []*model.IndexInfo, job *model.Job) error { @@ -1334,14 +1372,17 @@ type addIndexWorker struct { distinctCheckFlags []bool } -func newAddIndexWorker(decodeColMap map[int64]decoder.Column, t table.PhysicalTable, bfCtx *backfillCtx, jc *JobContext, jobID, eleID int64, eleTypeKey []byte) (*addIndexWorker, error) { - if !bytes.Equal(eleTypeKey, meta.IndexElementKey) { - logutil.BgLogger().Error("Element type for addIndexWorker incorrect", zap.String("jobQuery", jc.cacheSQL), - zap.Int64("job ID", jobID), zap.ByteString("element type", eleTypeKey), zap.Int64("element ID", eleID)) - return nil, errors.Errorf("element type is not index, typeKey: %v", eleTypeKey) +func newAddIndexWorker(decodeColMap map[int64]decoder.Column, t table.PhysicalTable, bfCtx *backfillCtx, jc *JobContext, jobID int64, elements []*meta.Element) (*addIndexWorker, error) { + var indexes []table.Index + for _, ele := range elements { + if !bytes.Equal(ele.TypeKey, meta.IndexElementKey) { + logutil.BgLogger().Error("Element type for addIndexWorker incorrect", zap.String("jobQuery", jc.cacheSQL), + zap.Int64("job ID", jobID), zap.ByteString("element type", ele.TypeKey), zap.Int64("element ID", ele.ID)) + return nil, errors.Errorf("element type is not index, typeKey: %v", ele.TypeKey) + } + indexInfo := model.FindIndexInfoByID(t.Meta().Indices, ele.ID) + indexes = append(indexes, tables.NewIndex(t.GetPhysicalID(), t.Meta(), indexInfo)) } - indexInfo := model.FindIndexInfoByID(t.Meta().Indices, eleID) - index := tables.NewIndex(t.GetPhysicalID(), t.Meta(), indexInfo) rowDecoder := decoder.NewRowDecoder(t, t.WritableCols(), decodeColMap) var lwCtx *ingest.WriterContext @@ -1350,11 +1391,12 @@ func newAddIndexWorker(decodeColMap map[int64]decoder.Column, t table.PhysicalTa if !ok { return nil, errors.Trace(errors.New(ingest.LitErrGetBackendFail)) } - ei, err := bc.EngMgr.Register(bc, jobID, eleID, bfCtx.schemaName, t.Meta().Name.O) + ei, err := bc.EngMgr.Register(bc, jobID, elements[0].ID, bfCtx.schemaName, t.Meta().Name.O) if err != nil { return nil, errors.Trace(err) } - lwCtx, err = ei.NewWriterCtx(bfCtx.id, indexInfo.Unique) + // TODO, always use false for now, need to support transfer bool slices for lwCtx + lwCtx, err = ei.NewWriterCtx(bfCtx.id, false) if err != nil { return nil, err } @@ -1363,14 +1405,14 @@ func newAddIndexWorker(decodeColMap map[int64]decoder.Column, t table.PhysicalTa return &addIndexWorker{ baseIndexWorker: baseIndexWorker{ backfillCtx: bfCtx, - indexes: []table.Index{index}, + indexes: indexes, rowDecoder: rowDecoder, defaultVals: make([]types.Datum, len(t.WritableCols())), rowMap: make(map[int64]types.Datum, len(decodeColMap)), metricCounter: metrics.BackfillTotalCounter.WithLabelValues(metrics.GenerateReorgLabel("add_idx_rate", bfCtx.schemaName, t.Meta().Name.String())), jobContext: jc, }, - index: index, + index: indexes[0], writerCtx: lwCtx, }, nil } @@ -1443,7 +1485,8 @@ func newAddIndexWorkerContext(d *ddl, schemaName model.CIStr, tbl table.Table, w logutil.BgLogger().Error("[ddl] make up decode column map failed", zap.Error(err)) return nil, errors.Trace(err) } - bf, err1 := newAddIndexWorker(decodeColMap, phyTbl, bfCtx, jobCtx, bfJob.JobID, bfJob.EleID, bfJob.EleKey) + element := meta.Element{ID: bfJob.EleID, TypeKey: bfJob.EleKey} + bf, err1 := newAddIndexWorker(decodeColMap, phyTbl, bfCtx, jobCtx, bfJob.JobID, []*meta.Element{&element}) return bf, err1 }) } @@ -1751,7 +1794,7 @@ func (w *addIndexWorker) BackfillDataInTxn(handleRange reorgBackfillTask) (taskC return errors.Trace(err) } - for _, idxRecord := range idxRecords { + for i, idxRecord := range idxRecords { taskCtx.scanCount++ // The index is already exists, we skip it, no needs to backfill it. // The following update, delete, insert on these rows, TiDB can handle it correctly. @@ -1772,7 +1815,7 @@ func (w *addIndexWorker) BackfillDataInTxn(handleRange reorgBackfillTask) (taskC // Create the index. if w.writerCtx == nil { - handle, err := w.index.Create(w.sessCtx, txn, idxRecord.vals, idxRecord.handle, idxRecord.rsData, table.WithIgnoreAssertion, table.FromBackfill) + handle, err := w.indexes[i%len(w.indexes)].Create(w.sessCtx, txn, idxRecord.vals, idxRecord.handle, idxRecord.rsData, table.WithIgnoreAssertion, table.FromBackfill) if err != nil { if kv.ErrKeyExists.Equal(err) && idxRecord.handle.Equal(handle) { // Index already exists, skip it. @@ -1784,7 +1827,7 @@ func (w *addIndexWorker) BackfillDataInTxn(handleRange reorgBackfillTask) (taskC } else { // The lightning environment is ready. vars := w.sessCtx.GetSessionVars() sCtx, writeBufs := vars.StmtCtx, vars.GetWriteStmtBufs() - iter := w.index.GenIndexKVIter(sCtx, idxRecord.vals, idxRecord.handle, idxRecord.rsData) + iter := w.indexes[i%len(w.indexes)].GenIndexKVIter(sCtx, idxRecord.vals, idxRecord.handle, idxRecord.rsData) for iter.Valid() { key, idxVal, _, err := iter.Next(writeBufs.IndexKeyBuf) if err != nil { diff --git a/ddl/index_cop.go b/ddl/index_cop.go index da9d4e397a6cb..9fed50486a1c7 100644 --- a/ddl/index_cop.go +++ b/ddl/index_cop.go @@ -266,27 +266,30 @@ func (c *copReqSenderPool) recycleIdxRecordsAndChunk(idxRecs []*indexRecord, chk // It is unchanged after initialization. type copContext struct { tblInfo *model.TableInfo - idxInfo *model.IndexInfo + idxInfo []*model.IndexInfo pkInfo *model.IndexInfo colInfos []*model.ColumnInfo fieldTps []*types.FieldType sessCtx sessionctx.Context expColInfos []*expression.Column - idxColOutputOffsets []int + idxColOutputOffsets [][]int handleOutputOffsets []int virtualColOffsets []int virtualColFieldTps []*types.FieldType } -func newCopContext(tblInfo *model.TableInfo, idxInfo *model.IndexInfo, sessCtx sessionctx.Context) (*copContext, error) { +func newCopContext(tblInfo *model.TableInfo, indexesInfo []*model.IndexInfo, sessCtx sessionctx.Context) (*copContext, error) { var err error - usedColumnIDs := make(map[int64]struct{}, len(idxInfo.Columns)) - usedColumnIDs, err = fillUsedColumns(usedColumnIDs, idxInfo, tblInfo) var handleIDs []int64 - if err != nil { - return nil, err + usedColumnIDs := make(map[int64]struct{}, len(tblInfo.Columns)) + for _, idxInfo := range indexesInfo { + usedColumnIDs, err = fillUsedColumns(usedColumnIDs, idxInfo, tblInfo) + if err != nil { + return nil, err + } } + var primaryIdx *model.IndexInfo if tblInfo.PKIsHandle { pkCol := tblInfo.GetPkColInfo() @@ -306,8 +309,8 @@ func newCopContext(tblInfo *model.TableInfo, idxInfo *model.IndexInfo, sessCtx s } // Only collect the columns that are used by the index. - colInfos := make([]*model.ColumnInfo, 0, len(idxInfo.Columns)) - fieldTps := make([]*types.FieldType, 0, len(idxInfo.Columns)) + colInfos := make([]*model.ColumnInfo, 0, len(tblInfo.Columns)) + fieldTps := make([]*types.FieldType, 0, len(tblInfo.Columns)) for i := range tblInfo.Columns { col := tblInfo.Columns[i] if _, found := usedColumnIDs[col.ID]; found { @@ -329,13 +332,17 @@ func newCopContext(tblInfo *model.TableInfo, idxInfo *model.IndexInfo, sessCtx s if err != nil { return nil, err } - idxOffsets := resolveIndicesForIndex(expColInfos, idxInfo, tblInfo) + + var idxOffsets [][]int + for _, idxInfo := range indexesInfo { + idxOffsets = append(idxOffsets, resolveIndicesForIndex(expColInfos, idxInfo, tblInfo)) + } hdColOffsets := resolveIndicesForHandle(expColInfos, handleIDs) vColOffsets, vColFts := collectVirtualColumnOffsetsAndTypes(expColInfos) copCtx := &copContext{ tblInfo: tblInfo, - idxInfo: idxInfo, + idxInfo: indexesInfo, pkInfo: primaryIdx, colInfos: colInfos, fieldTps: fieldTps, @@ -452,14 +459,16 @@ func (c *copContext) fetchTableScanResult(ctx context.Context, result distsql.Se return nil, false, errors.Trace(err) } for row := iter.Begin(); row != iter.End(); row = iter.Next() { - idxDt := extractDatumByOffsets(row, c.idxColOutputOffsets, c.expColInfos) hdDt := extractDatumByOffsets(row, c.handleOutputOffsets, c.expColInfos) handle, err := buildHandle(hdDt, c.tblInfo, c.pkInfo, sctx) if err != nil { return nil, false, errors.Trace(err) } - rsData := getRestoreData(c.tblInfo, c.idxInfo, c.pkInfo, hdDt) - buf = append(buf, &indexRecord{handle: handle, key: nil, vals: idxDt, rsData: rsData, skip: false}) + for i, idxInfo := range c.idxInfo { + idxDt := extractDatumByOffsets(row, c.idxColOutputOffsets[i], c.expColInfos) + rsData := getRestoreData(c.tblInfo, idxInfo, c.pkInfo, hdDt) + buf = append(buf, &indexRecord{handle: handle, key: nil, vals: idxDt, rsData: rsData, skip: false}) + } } return buf, false, nil } diff --git a/ddl/index_cop_test.go b/ddl/index_cop_test.go index 5edc1680b2308..2f8557afe8889 100644 --- a/ddl/index_cop_test.go +++ b/ddl/index_cop_test.go @@ -38,7 +38,7 @@ func TestAddIndexFetchRowsFromCoprocessor(t *testing.T) { require.NoError(t, err) tblInfo := tbl.Meta() idxInfo := tblInfo.FindIndexByName(idx) - copCtx, err := ddl.NewCopContext4Test(tblInfo, idxInfo, tk.Session()) + copCtx, err := ddl.NewCopContext4Test(tblInfo, []*model.IndexInfo{idxInfo}, tk.Session()) require.NoError(t, err) startKey := tbl.RecordPrefix() endKey := startKey.PrefixNext()