diff --git a/ddl/backfilling.go b/ddl/backfilling.go index 94f5a2ad650a3..93e376da567f0 100644 --- a/ddl/backfilling.go +++ b/ddl/backfilling.go @@ -519,8 +519,7 @@ func loadDDLReorgVars(w *worker) error { return ddlutil.LoadDDLReorgVars(w.ddlJobCtx, ctx) } -func makeupDecodeColMap(sessCtx sessionctx.Context, t table.Table) (map[int64]decoder.Column, error) { - dbName := model.NewCIStr(sessCtx.GetSessionVars().CurrentDB) +func makeupDecodeColMap(sessCtx sessionctx.Context, dbName model.CIStr, t table.Table) (map[int64]decoder.Column, error) { writableColInfos := make([]*model.ColumnInfo, 0, len(t.WritableCols())) for _, col := range t.WritableCols() { writableColInfos = append(writableColInfos, col.ColumnInfo) @@ -556,7 +555,7 @@ func (w *worker) writePhysicalTableRecord(t table.PhysicalTable, bfWorkerType ba startKey, endKey := reorgInfo.StartKey, reorgInfo.EndKey sessCtx := newContext(reorgInfo.d.store) - decodeColMap, err := makeupDecodeColMap(sessCtx, t) + decodeColMap, err := makeupDecodeColMap(sessCtx, reorgInfo.dbInfo.Name, t) if err != nil { return errors.Trace(err) } diff --git a/ddl/column.go b/ddl/column.go index 632d370aa612e..9c514f613b6b0 100644 --- a/ddl/column.go +++ b/ddl/column.go @@ -1141,6 +1141,253 @@ func (w *worker) doModifyColumnTypeWithData( return ver, errors.Trace(err) } +<<<<<<< HEAD +======= +func doReorgWorkForModifyColumnMultiSchema(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job, tbl table.Table, + oldCol, changingCol *model.ColumnInfo, changingIdxs []*model.IndexInfo) (done bool, ver int64, err error) { + if job.MultiSchemaInfo.Revertible { + done, ver, err = doReorgWorkForModifyColumn(w, d, t, job, tbl, oldCol, changingCol, changingIdxs) + if done { + // We need another round to wait for all the others sub-jobs to finish. + job.MarkNonRevertible() + } + // We need another round to run the reorg process. + return false, ver, err + } + // Non-revertible means all the sub jobs finished. + return true, ver, err +} + +func doReorgWorkForModifyColumn(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job, tbl table.Table, + oldCol, changingCol *model.ColumnInfo, changingIdxs []*model.IndexInfo) (done bool, ver int64, err error) { + job.ReorgMeta.ReorgTp = model.ReorgTypeTxn + rh := newReorgHandler(t, w.sess, w.concurrentDDL) + dbInfo, err := t.GetDatabase(job.SchemaID) + if err != nil { + return false, ver, errors.Trace(err) + } + reorgInfo, err := getReorgInfo(d.jobContext(job), d, rh, job, dbInfo, tbl, BuildElements(changingCol, changingIdxs), false) + if err != nil || reorgInfo.first { + // If we run reorg firstly, we should update the job snapshot version + // and then run the reorg next time. + return false, ver, errors.Trace(err) + } + + // Inject a failpoint so that we can pause here and do verification on other components. + // With a failpoint-enabled version of TiDB, you can trigger this failpoint by the following command: + // enable: curl -X PUT -d "pause" "http://127.0.0.1:10080/fail/github.com/pingcap/tidb/ddl/mockDelayInModifyColumnTypeWithData". + // disable: curl -X DELETE "http://127.0.0.1:10080/fail/github.com/pingcap/tidb/ddl/mockDelayInModifyColumnTypeWithData" + failpoint.Inject("mockDelayInModifyColumnTypeWithData", func() {}) + err = w.runReorgJob(rh, reorgInfo, tbl.Meta(), d.lease, func() (addIndexErr error) { + defer util.Recover(metrics.LabelDDL, "onModifyColumn", + func() { + addIndexErr = dbterror.ErrCancelledDDLJob.GenWithStack("modify table `%v` column `%v` panic", tbl.Meta().Name, oldCol.Name) + }, false) + // Use old column name to generate less confusing error messages. + changingColCpy := changingCol.Clone() + changingColCpy.Name = oldCol.Name + return w.updateCurrentElement(tbl, reorgInfo) + }) + if err != nil { + if dbterror.ErrWaitReorgTimeout.Equal(err) { + // If timeout, we should return, check for the owner and re-wait job done. + return false, ver, nil + } + if kv.IsTxnRetryableError(err) || dbterror.ErrNotOwner.Equal(err) { + return false, ver, errors.Trace(err) + } + if err1 := rh.RemoveDDLReorgHandle(job, reorgInfo.elements); err1 != nil { + logutil.BgLogger().Warn("[ddl] run modify column job failed, RemoveDDLReorgHandle failed, can't convert job to rollback", + zap.String("job", job.String()), zap.Error(err1)) + } + logutil.BgLogger().Warn("[ddl] run modify column job failed, convert job to rollback", zap.String("job", job.String()), zap.Error(err)) + job.State = model.JobStateRollingback + return false, ver, errors.Trace(err) + } + return true, ver, nil +} + +func adjustTableInfoAfterModifyColumnWithData(tblInfo *model.TableInfo, pos *ast.ColumnPosition, + oldCol, changingCol *model.ColumnInfo, newName model.CIStr, changingIdxs []*model.IndexInfo) (err error) { + if pos != nil && pos.RelativeColumn != nil && oldCol.Name.L == pos.RelativeColumn.Name.L { + // For cases like `modify column b after b`, it should report this error. + return errors.Trace(infoschema.ErrColumnNotExists.GenWithStackByArgs(oldCol.Name, tblInfo.Name)) + } + internalColName := changingCol.Name + changingCol = replaceOldColumn(tblInfo, oldCol, changingCol, newName) + if len(changingIdxs) > 0 { + updateNewIdxColsNameOffset(changingIdxs, internalColName, changingCol) + indexesToRemove := filterIndexesToRemove(changingIdxs, newName, tblInfo) + replaceOldIndexes(tblInfo, indexesToRemove) + } + if tblInfo.TTLInfo != nil { + updateTTLInfoWhenModifyColumn(tblInfo, oldCol.Name, changingCol.Name) + } + // Move the new column to a correct offset. + destOffset, err := LocateOffsetToMove(changingCol.Offset, pos, tblInfo) + if err != nil { + return errors.Trace(err) + } + tblInfo.MoveColumnInfo(changingCol.Offset, destOffset) + return nil +} + +func replaceOldColumn(tblInfo *model.TableInfo, oldCol, changingCol *model.ColumnInfo, + newName model.CIStr) *model.ColumnInfo { + tblInfo.MoveColumnInfo(changingCol.Offset, len(tblInfo.Columns)-1) + changingCol = updateChangingCol(changingCol, newName, oldCol.Offset) + tblInfo.Columns[oldCol.Offset] = changingCol + tblInfo.Columns = tblInfo.Columns[:len(tblInfo.Columns)-1] + return changingCol +} + +func replaceOldIndexes(tblInfo *model.TableInfo, changingIdxs []*model.IndexInfo) { + // Remove the changing indexes. + for i, idx := range tblInfo.Indices { + for _, cIdx := range changingIdxs { + if cIdx.ID == idx.ID { + tblInfo.Indices[i] = nil + break + } + } + } + tmp := tblInfo.Indices[:0] + for _, idx := range tblInfo.Indices { + if idx != nil { + tmp = append(tmp, idx) + } + } + tblInfo.Indices = tmp + // Replace the old indexes with changing indexes. + for _, cIdx := range changingIdxs { + // The index name should be changed from '_Idx$_name' to 'name'. + idxName := getChangingIndexOriginName(cIdx) + for i, idx := range tblInfo.Indices { + if strings.EqualFold(idxName, idx.Name.O) { + cIdx.Name = model.NewCIStr(idxName) + tblInfo.Indices[i] = cIdx + break + } + } + } +} + +// updateNewIdxColsNameOffset updates the name&offset of the index column. +func updateNewIdxColsNameOffset(changingIdxs []*model.IndexInfo, + oldName model.CIStr, changingCol *model.ColumnInfo) { + for _, idx := range changingIdxs { + for _, col := range idx.Columns { + if col.Name.L == oldName.L { + SetIdxColNameOffset(col, changingCol) + } + } + } +} + +func updateFKInfoWhenModifyColumn(tblInfo *model.TableInfo, oldCol, newCol model.CIStr) { + if oldCol.L == newCol.L { + return + } + for _, fk := range tblInfo.ForeignKeys { + for i := range fk.Cols { + if fk.Cols[i].L == oldCol.L { + fk.Cols[i] = newCol + } + } + } +} + +func updateTTLInfoWhenModifyColumn(tblInfo *model.TableInfo, oldCol, newCol model.CIStr) { + if oldCol.L == newCol.L { + return + } + if tblInfo.TTLInfo != nil { + if tblInfo.TTLInfo.ColumnName.L == oldCol.L { + tblInfo.TTLInfo.ColumnName = newCol + } + } +} + +// filterIndexesToRemove filters out the indexes that can be removed. +func filterIndexesToRemove(changingIdxs []*model.IndexInfo, colName model.CIStr, tblInfo *model.TableInfo) []*model.IndexInfo { + indexesToRemove := make([]*model.IndexInfo, 0, len(changingIdxs)) + for _, idx := range changingIdxs { + var hasOtherChangingCol bool + for _, col := range idx.Columns { + if col.Name.L == colName.L { + continue // ignore the current modifying column. + } + if !hasOtherChangingCol { + hasOtherChangingCol = tblInfo.Columns[col.Offset].ChangeStateInfo != nil + } + } + // For the indexes that still contains other changing column, skip removing it now. + // We leave the removal work to the last modify column job. + if !hasOtherChangingCol { + indexesToRemove = append(indexesToRemove, idx) + } + } + return indexesToRemove +} + +func updateChangingCol(col *model.ColumnInfo, newName model.CIStr, newOffset int) *model.ColumnInfo { + col.Name = newName + col.ChangeStateInfo = nil + col.Offset = newOffset + // After changing the column, the column's type is change, so it needs to set OriginDefaultValue back + // so that there is no error in getting the default value from OriginDefaultValue. + // Besides, nil data that was not backfilled in the "add column" is backfilled after the column is changed. + // So it can set OriginDefaultValue to nil. + col.OriginDefaultValue = nil + return col +} + +func buildRelatedIndexInfos(tblInfo *model.TableInfo, colID int64) []*model.IndexInfo { + var indexInfos []*model.IndexInfo + for _, idx := range tblInfo.Indices { + if idx.HasColumnInIndexColumns(tblInfo, colID) { + indexInfos = append(indexInfos, idx) + } + } + return indexInfos +} + +func buildRelatedIndexIDs(tblInfo *model.TableInfo, colID int64) []int64 { + var oldIdxIDs []int64 + for _, idx := range tblInfo.Indices { + if idx.HasColumnInIndexColumns(tblInfo, colID) { + oldIdxIDs = append(oldIdxIDs, idx.ID) + } + } + return oldIdxIDs +} + +// LocateOffsetToMove returns the offset of the column to move. +func LocateOffsetToMove(currentOffset int, pos *ast.ColumnPosition, tblInfo *model.TableInfo) (destOffset int, err error) { + if pos == nil { + return currentOffset, nil + } + // Get column offset. + switch pos.Tp { + case ast.ColumnPositionFirst: + return 0, nil + case ast.ColumnPositionAfter: + c := model.FindColumnInfo(tblInfo.Columns, pos.RelativeColumn.Name.L) + if c == nil || c.State != model.StatePublic { + return 0, infoschema.ErrColumnNotExists.GenWithStackByArgs(pos.RelativeColumn, tblInfo.Name) + } + if currentOffset <= c.Offset { + return c.Offset, nil + } + return c.Offset + 1, nil + case ast.ColumnPositionNone: + return currentOffset, nil + default: + return 0, errors.Errorf("unknown column position type") + } +} + +>>>>>>> b73eb4bf4c (ddl: fix unexpect fail when create expression index (#39822)) // BuildElements is exported for testing. func BuildElements(changingCol *model.ColumnInfo, changingIdxs []*model.IndexInfo) []*meta.Element { elements := make([]*meta.Element, 0, len(changingIdxs)+1) diff --git a/ddl/db_change_test.go b/ddl/db_change_test.go index 6b6e9d8132b0a..ebcfe89125fe5 100644 --- a/ddl/db_change_test.go +++ b/ddl/db_change_test.go @@ -1923,6 +1923,14 @@ func (s *serialTestStateChangeSuite) TestCreateExpressionIndex(c *C) { c.Assert(checkErr, IsNil) tk.MustExec("admin check table t") tk.MustQuery("select * from t order by a, b").Check(testkit.Rows("0 9", "0 11", "0 11", "1 7", "2 7", "5 7", "8 8", "10 10", "10 10")) + + // https://github.com/pingcap/tidb/issues/39784 + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + tk.MustExec("create table t(name varchar(20))") + tk.MustExec("insert into t values ('Abc'), ('Bcd'), ('abc')") + tk.MustExec("create index idx on test.t((lower(test.t.name)))") + tk.MustExec("admin check table t") } func (s *serialTestStateChangeSuite) TestCreateUniqueExpressionIndex(c *C) { diff --git a/ddl/index.go b/ddl/index.go index b817d13ee38c1..57e8630ec7d3a 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -615,6 +615,199 @@ func (w *worker) onCreateIndex(d *ddlCtx, t *meta.Meta, job *model.Job, isPK boo func onDropIndex(t *meta.Meta, job *model.Job) (ver int64, _ error) { tblInfo, indexInfo, err := checkDropIndex(t, job) if err != nil { +<<<<<<< HEAD +======= + return false + } + defer w.sessPool.put(ctx) + failpoint.Inject("EnablePiTR", func() { + logutil.BgLogger().Info("lightning: mock enable PiTR") + failpoint.Return(true) + }) + // Ingest way is not compatible with PiTR. + return !utils.IsLogBackupInUse(ctx) +} + +// IngestJobsNotExisted checks the ddl about `add index` with ingest method not existed. +func IngestJobsNotExisted(ctx sessionctx.Context) bool { + sess := session{ctx} + template := "select job_meta from mysql.tidb_ddl_job where reorg and (type = %d or type = %d) and processing;" + sql := fmt.Sprintf(template, model.ActionAddIndex, model.ActionAddPrimaryKey) + rows, err := sess.execute(context.Background(), sql, "check-pitr") + if err != nil { + logutil.BgLogger().Warn("cannot check ingest job", zap.Error(err)) + return false + } + for _, row := range rows { + jobBinary := row.GetBytes(0) + runJob := model.Job{} + err := runJob.Decode(jobBinary) + if err != nil { + logutil.BgLogger().Warn("cannot check ingest job", zap.Error(err)) + return false + } + // Check whether this add index job is using lightning to do the backfill work. + if runJob.ReorgMeta.ReorgTp == model.ReorgTypeLitMerge { + return false + } + } + return true +} + +// tryFallbackToTxnMerge changes the reorg type to txn-merge if the lightning backfill meets something wrong. +func tryFallbackToTxnMerge(job *model.Job, err error) error { + if job.State != model.JobStateRollingback { + logutil.BgLogger().Info("[ddl] fallback to txn-merge backfill process", zap.Error(err)) + job.ReorgMeta.ReorgTp = model.ReorgTypeTxnMerge + job.SnapshotVer = 0 + job.RowCount = 0 + return nil + } + return err +} + +func doReorgWorkForCreateIndexMultiSchema(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job, + tbl table.Table, indexInfo *model.IndexInfo) (done bool, ver int64, err error) { + if job.MultiSchemaInfo.Revertible { + done, ver, err = doReorgWorkForCreateIndex(w, d, t, job, tbl, indexInfo) + if done { + job.MarkNonRevertible() + } + // We need another round to wait for all the others sub-jobs to finish. + return false, ver, err + } + return true, ver, err +} + +func doReorgWorkForCreateIndex(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job, + tbl table.Table, indexInfo *model.IndexInfo) (done bool, ver int64, err error) { + bfProcess := pickBackfillType(w, job) + if !bfProcess.NeedMergeProcess() { + return runReorgJobAndHandleErr(w, d, t, job, tbl, indexInfo, false) + } + switch indexInfo.BackfillState { + case model.BackfillStateRunning: + logutil.BgLogger().Info("[ddl] index backfill state running", + zap.Int64("job ID", job.ID), zap.String("table", tbl.Meta().Name.O), + zap.Bool("ingest mode", bfProcess == model.ReorgTypeLitMerge), + zap.String("index", indexInfo.Name.O)) + switch bfProcess { + case model.ReorgTypeLitMerge: + bc, ok := ingest.LitBackCtxMgr.Load(job.ID) + if ok && bc.Done() { + break + } + if !ok && job.SnapshotVer != 0 { + // The owner is crashed or changed, we need to restart the backfill. + job.SnapshotVer = 0 + job.RowCount = 0 + return false, ver, nil + } + bc, err = ingest.LitBackCtxMgr.Register(w.ctx, indexInfo.Unique, job.ID, job.ReorgMeta.SQLMode) + if err != nil { + err = tryFallbackToTxnMerge(job, err) + return false, ver, errors.Trace(err) + } + done, ver, err = runReorgJobAndHandleErr(w, d, t, job, tbl, indexInfo, false) + if err != nil { + ingest.LitBackCtxMgr.Unregister(job.ID) + err = tryFallbackToTxnMerge(job, err) + return false, ver, errors.Trace(err) + } + if !done { + return false, ver, nil + } + err = bc.FinishImport(indexInfo.ID, indexInfo.Unique, tbl) + if err != nil { + if kv.ErrKeyExists.Equal(err) { + logutil.BgLogger().Warn("[ddl] import index duplicate key, convert job to rollback", zap.String("job", job.String()), zap.Error(err)) + ver, err = convertAddIdxJob2RollbackJob(d, t, job, tbl.Meta(), indexInfo, err) + } else { + logutil.BgLogger().Warn("[ddl] lightning import error", zap.Error(err)) + err = tryFallbackToTxnMerge(job, err) + } + ingest.LitBackCtxMgr.Unregister(job.ID) + return false, ver, errors.Trace(err) + } + bc.SetDone() + case model.ReorgTypeTxnMerge: + done, ver, err = runReorgJobAndHandleErr(w, d, t, job, tbl, indexInfo, false) + if err != nil || !done { + return false, ver, errors.Trace(err) + } + } + indexInfo.BackfillState = model.BackfillStateReadyToMerge + ver, err = updateVersionAndTableInfo(d, t, job, tbl.Meta(), true) + return false, ver, errors.Trace(err) + case model.BackfillStateReadyToMerge: + logutil.BgLogger().Info("[ddl] index backfill state ready to merge", zap.Int64("job ID", job.ID), + zap.String("table", tbl.Meta().Name.O), zap.String("index", indexInfo.Name.O)) + indexInfo.BackfillState = model.BackfillStateMerging + if bfProcess == model.ReorgTypeLitMerge { + ingest.LitBackCtxMgr.Unregister(job.ID) + } + job.SnapshotVer = 0 // Reset the snapshot version for merge index reorg. + ver, err = updateVersionAndTableInfo(d, t, job, tbl.Meta(), true) + return false, ver, errors.Trace(err) + case model.BackfillStateMerging: + done, ver, err = runReorgJobAndHandleErr(w, d, t, job, tbl, indexInfo, true) + if !done { + return false, ver, err + } + indexInfo.BackfillState = model.BackfillStateInapplicable // Prevent double-write on this index. + return true, ver, nil + default: + return false, 0, dbterror.ErrInvalidDDLState.GenWithStackByArgs("backfill", indexInfo.BackfillState) + } +} + +func runReorgJobAndHandleErr(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job, + tbl table.Table, indexInfo *model.IndexInfo, mergingTmpIdx bool) (done bool, ver int64, err error) { + elements := []*meta.Element{{ID: indexInfo.ID, TypeKey: meta.IndexElementKey}} + rh := newReorgHandler(t, w.sess, w.concurrentDDL) + dbInfo, err := t.GetDatabase(job.SchemaID) + if err != nil { + return false, ver, errors.Trace(err) + } + reorgInfo, err := getReorgInfo(d.jobContext(job), d, rh, job, dbInfo, tbl, elements, mergingTmpIdx) + if err != nil || reorgInfo.first { + // If we run reorg firstly, we should update the job snapshot version + // and then run the reorg next time. + return false, ver, errors.Trace(err) + } + err = w.runReorgJob(rh, reorgInfo, tbl.Meta(), d.lease, func() (addIndexErr error) { + defer util.Recover(metrics.LabelDDL, "onCreateIndex", + func() { + addIndexErr = dbterror.ErrCancelledDDLJob.GenWithStack("add table `%v` index `%v` panic", tbl.Meta().Name, indexInfo.Name) + }, false) + return w.addTableIndex(tbl, reorgInfo) + }) + if err != nil { + if dbterror.ErrWaitReorgTimeout.Equal(err) { + // if timeout, we should return, check for the owner and re-wait job done. + return false, ver, nil + } + if kv.ErrKeyExists.Equal(err) || dbterror.ErrCancelledDDLJob.Equal(err) || dbterror.ErrCantDecodeRecord.Equal(err) { + logutil.BgLogger().Warn("[ddl] run add index job failed, convert job to rollback", zap.String("job", job.String()), zap.Error(err)) + ver, err = convertAddIdxJob2RollbackJob(d, t, job, tbl.Meta(), indexInfo, err) + if err1 := rh.RemoveDDLReorgHandle(job, reorgInfo.elements); err1 != nil { + logutil.BgLogger().Warn("[ddl] run add index job failed, convert job to rollback, RemoveDDLReorgHandle failed", zap.String("job", job.String()), zap.Error(err1)) + } + } + return false, ver, errors.Trace(err) + } + return true, ver, nil +} + +func onDropIndex(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error) { + tblInfo, indexInfo, ifExists, err := checkDropIndex(d, t, job) + if err != nil { + if ifExists && dbterror.ErrCantDropFieldOrKey.Equal(err) { + job.Warning = toTError(err) + job.State = model.JobStateDone + return ver, nil + } +>>>>>>> b73eb4bf4c (ddl: fix unexpect fail when create expression index (#39822)) return ver, errors.Trace(err) } if tblInfo.TableCacheStatusType != model.TableCacheStatusDisable { diff --git a/ddl/partition.go b/ddl/partition.go index efa3e75e1f2fc..ce9311a0786cd 100644 --- a/ddl/partition.go +++ b/ddl/partition.go @@ -1147,6 +1147,10 @@ func (w *worker) onDropTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) ( if err != nil { return ver, errors.Trace(err) } + dbInfo, err := t.GetDatabase(job.SchemaID) + if err != nil { + return ver, errors.Trace(err) + } // If table has global indexes, we need reorg to clean up them. if pt, ok := tbl.(table.PartitionedTable); ok && hasGlobalIndex(tblInfo) { // Build elements for compatible with modify column type. elements will not be used when reorganizing. @@ -1156,7 +1160,12 @@ func (w *worker) onDropTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) ( elements = append(elements, &meta.Element{ID: idxInfo.ID, TypeKey: meta.IndexElementKey}) } } +<<<<<<< HEAD reorgInfo, err := getReorgInfoFromPartitions(d, t, job, tbl, physicalTableIDs, elements) +======= + rh := newReorgHandler(t, w.sess, w.concurrentDDL) + reorgInfo, err := getReorgInfoFromPartitions(d.jobContext(job), d, rh, job, dbInfo, tbl, physicalTableIDs, elements) +>>>>>>> b73eb4bf4c (ddl: fix unexpect fail when create expression index (#39822)) if err != nil || reorgInfo.first { // If we run reorg firstly, we should update the job snapshot version diff --git a/ddl/reorg.go b/ddl/reorg.go index deeac2bfdb8ae..0d2a295e70f57 100644 --- a/ddl/reorg.go +++ b/ddl/reorg.go @@ -385,6 +385,7 @@ type reorgInfo struct { // PhysicalTableID is used to trace the current partition we are handling. // If the table is not partitioned, PhysicalTableID would be TableID. PhysicalTableID int64 + dbInfo *model.DBInfo elements []*meta.Element currElement *meta.Element } @@ -578,7 +579,12 @@ func getValidCurrentVersion(store kv.Storage) (ver kv.Version, err error) { return ver, nil } +<<<<<<< HEAD func getReorgInfo(d *ddlCtx, t *meta.Meta, job *model.Job, tbl table.Table, elements []*meta.Element) (*reorgInfo, error) { +======= +func getReorgInfo(ctx *JobContext, d *ddlCtx, rh *reorgHandler, job *model.Job, dbInfo *model.DBInfo, + tbl table.Table, elements []*meta.Element, mergingTmpIdx bool) (*reorgInfo, error) { +>>>>>>> b73eb4bf4c (ddl: fix unexpect fail when create expression index (#39822)) var ( element *meta.Element start kv.Key @@ -647,7 +653,73 @@ func getReorgInfo(d *ddlCtx, t *meta.Meta, job *model.Job, tbl table.Table, elem }) var err error +<<<<<<< HEAD element, start, end, pid, err = t.GetDDLReorgHandle(job) +======= + element, start, end, pid, err = rh.GetDDLReorgHandle(job) + if err != nil { + // If the reorg element doesn't exist, this reorg info should be saved by the older TiDB versions. + // It's compatible with the older TiDB versions. + // We'll try to remove it in the next major TiDB version. + if meta.ErrDDLReorgElementNotExist.Equal(err) { + job.SnapshotVer = 0 + logutil.BgLogger().Warn("[ddl] get reorg info, the element does not exist", zap.String("job", job.String()), zap.Bool("enableConcurrentDDL", rh.enableConcurrentDDL)) + } + return &info, errors.Trace(err) + } + } + info.Job = job + info.d = d + info.StartKey = start + info.EndKey = end + info.PhysicalTableID = pid + info.currElement = element + info.elements = elements + info.mergingTmpIdx = mergingTmpIdx + info.dbInfo = dbInfo + + return &info, nil +} + +func getReorgInfoFromPartitions(ctx *JobContext, d *ddlCtx, rh *reorgHandler, job *model.Job, dbInfo *model.DBInfo, tbl table.Table, partitionIDs []int64, elements []*meta.Element) (*reorgInfo, error) { + var ( + element *meta.Element + start kv.Key + end kv.Key + pid int64 + info reorgInfo + ) + if job.SnapshotVer == 0 { + info.first = true + if d.lease > 0 { // Only delay when it's not in test. + delayForAsyncCommit() + } + ver, err := getValidCurrentVersion(d.store) + if err != nil { + return nil, errors.Trace(err) + } + pid = partitionIDs[0] + tb := tbl.(table.PartitionedTable).GetPartition(pid) + start, end, err = getTableRange(ctx, d, tb, ver.Ver, job.Priority) + if err != nil { + return nil, errors.Trace(err) + } + logutil.BgLogger().Info("[ddl] job get table range", + zap.Int64("job ID", job.ID), zap.Int64("physical table ID", pid), + zap.String("start key", hex.EncodeToString(start)), + zap.String("end key", hex.EncodeToString(end))) + + err = rh.InitDDLReorgHandle(job, start, end, pid, elements[0]) + if err != nil { + return &info, errors.Trace(err) + } + // Update info should after data persistent. + job.SnapshotVer = ver.Ver + element = elements[0] + } else { + var err error + element, start, end, pid, err = rh.GetDDLReorgHandle(job) +>>>>>>> b73eb4bf4c (ddl: fix unexpect fail when create expression index (#39822)) if err != nil { // If the reorg element doesn't exist, this reorg info should be saved by the older TiDB versions. // It's compatible with the older TiDB versions. @@ -666,6 +738,7 @@ func getReorgInfo(d *ddlCtx, t *meta.Meta, job *model.Job, tbl table.Table, elem info.PhysicalTableID = pid info.currElement = element info.elements = elements + info.dbInfo = dbInfo return &info, nil }