Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#39822
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
Defined2014 authored and ti-chi-bot committed Dec 13, 2022
1 parent 4b777b4 commit a0dfc56
Show file tree
Hide file tree
Showing 6 changed files with 532 additions and 3 deletions.
5 changes: 2 additions & 3 deletions ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -519,8 +519,7 @@ func loadDDLReorgVars(w *worker) error {
return ddlutil.LoadDDLReorgVars(w.ddlJobCtx, ctx)
}

func makeupDecodeColMap(sessCtx sessionctx.Context, t table.Table) (map[int64]decoder.Column, error) {
dbName := model.NewCIStr(sessCtx.GetSessionVars().CurrentDB)
func makeupDecodeColMap(sessCtx sessionctx.Context, dbName model.CIStr, t table.Table) (map[int64]decoder.Column, error) {
writableColInfos := make([]*model.ColumnInfo, 0, len(t.WritableCols()))
for _, col := range t.WritableCols() {
writableColInfos = append(writableColInfos, col.ColumnInfo)
Expand Down Expand Up @@ -556,7 +555,7 @@ func (w *worker) writePhysicalTableRecord(t table.PhysicalTable, bfWorkerType ba

startKey, endKey := reorgInfo.StartKey, reorgInfo.EndKey
sessCtx := newContext(reorgInfo.d.store)
decodeColMap, err := makeupDecodeColMap(sessCtx, t)
decodeColMap, err := makeupDecodeColMap(sessCtx, reorgInfo.dbInfo.Name, t)
if err != nil {
return errors.Trace(err)
}
Expand Down
247 changes: 247 additions & 0 deletions ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -1141,6 +1141,253 @@ func (w *worker) doModifyColumnTypeWithData(
return ver, errors.Trace(err)
}

<<<<<<< HEAD
=======
func doReorgWorkForModifyColumnMultiSchema(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job, tbl table.Table,
oldCol, changingCol *model.ColumnInfo, changingIdxs []*model.IndexInfo) (done bool, ver int64, err error) {
if job.MultiSchemaInfo.Revertible {
done, ver, err = doReorgWorkForModifyColumn(w, d, t, job, tbl, oldCol, changingCol, changingIdxs)
if done {
// We need another round to wait for all the others sub-jobs to finish.
job.MarkNonRevertible()
}
// We need another round to run the reorg process.
return false, ver, err
}
// Non-revertible means all the sub jobs finished.
return true, ver, err
}

func doReorgWorkForModifyColumn(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job, tbl table.Table,
oldCol, changingCol *model.ColumnInfo, changingIdxs []*model.IndexInfo) (done bool, ver int64, err error) {
job.ReorgMeta.ReorgTp = model.ReorgTypeTxn
rh := newReorgHandler(t, w.sess, w.concurrentDDL)
dbInfo, err := t.GetDatabase(job.SchemaID)
if err != nil {
return false, ver, errors.Trace(err)
}
reorgInfo, err := getReorgInfo(d.jobContext(job), d, rh, job, dbInfo, tbl, BuildElements(changingCol, changingIdxs), false)
if err != nil || reorgInfo.first {
// If we run reorg firstly, we should update the job snapshot version
// and then run the reorg next time.
return false, ver, errors.Trace(err)
}

// Inject a failpoint so that we can pause here and do verification on other components.
// With a failpoint-enabled version of TiDB, you can trigger this failpoint by the following command:
// enable: curl -X PUT -d "pause" "http://127.0.0.1:10080/fail/github.com/pingcap/tidb/ddl/mockDelayInModifyColumnTypeWithData".
// disable: curl -X DELETE "http://127.0.0.1:10080/fail/github.com/pingcap/tidb/ddl/mockDelayInModifyColumnTypeWithData"
failpoint.Inject("mockDelayInModifyColumnTypeWithData", func() {})
err = w.runReorgJob(rh, reorgInfo, tbl.Meta(), d.lease, func() (addIndexErr error) {
defer util.Recover(metrics.LabelDDL, "onModifyColumn",
func() {
addIndexErr = dbterror.ErrCancelledDDLJob.GenWithStack("modify table `%v` column `%v` panic", tbl.Meta().Name, oldCol.Name)
}, false)
// Use old column name to generate less confusing error messages.
changingColCpy := changingCol.Clone()
changingColCpy.Name = oldCol.Name
return w.updateCurrentElement(tbl, reorgInfo)
})
if err != nil {
if dbterror.ErrWaitReorgTimeout.Equal(err) {
// If timeout, we should return, check for the owner and re-wait job done.
return false, ver, nil
}
if kv.IsTxnRetryableError(err) || dbterror.ErrNotOwner.Equal(err) {
return false, ver, errors.Trace(err)
}
if err1 := rh.RemoveDDLReorgHandle(job, reorgInfo.elements); err1 != nil {
logutil.BgLogger().Warn("[ddl] run modify column job failed, RemoveDDLReorgHandle failed, can't convert job to rollback",
zap.String("job", job.String()), zap.Error(err1))
}
logutil.BgLogger().Warn("[ddl] run modify column job failed, convert job to rollback", zap.String("job", job.String()), zap.Error(err))
job.State = model.JobStateRollingback
return false, ver, errors.Trace(err)
}
return true, ver, nil
}

func adjustTableInfoAfterModifyColumnWithData(tblInfo *model.TableInfo, pos *ast.ColumnPosition,
oldCol, changingCol *model.ColumnInfo, newName model.CIStr, changingIdxs []*model.IndexInfo) (err error) {
if pos != nil && pos.RelativeColumn != nil && oldCol.Name.L == pos.RelativeColumn.Name.L {
// For cases like `modify column b after b`, it should report this error.
return errors.Trace(infoschema.ErrColumnNotExists.GenWithStackByArgs(oldCol.Name, tblInfo.Name))
}
internalColName := changingCol.Name
changingCol = replaceOldColumn(tblInfo, oldCol, changingCol, newName)
if len(changingIdxs) > 0 {
updateNewIdxColsNameOffset(changingIdxs, internalColName, changingCol)
indexesToRemove := filterIndexesToRemove(changingIdxs, newName, tblInfo)
replaceOldIndexes(tblInfo, indexesToRemove)
}
if tblInfo.TTLInfo != nil {
updateTTLInfoWhenModifyColumn(tblInfo, oldCol.Name, changingCol.Name)
}
// Move the new column to a correct offset.
destOffset, err := LocateOffsetToMove(changingCol.Offset, pos, tblInfo)
if err != nil {
return errors.Trace(err)
}
tblInfo.MoveColumnInfo(changingCol.Offset, destOffset)
return nil
}

func replaceOldColumn(tblInfo *model.TableInfo, oldCol, changingCol *model.ColumnInfo,
newName model.CIStr) *model.ColumnInfo {
tblInfo.MoveColumnInfo(changingCol.Offset, len(tblInfo.Columns)-1)
changingCol = updateChangingCol(changingCol, newName, oldCol.Offset)
tblInfo.Columns[oldCol.Offset] = changingCol
tblInfo.Columns = tblInfo.Columns[:len(tblInfo.Columns)-1]
return changingCol
}

func replaceOldIndexes(tblInfo *model.TableInfo, changingIdxs []*model.IndexInfo) {
// Remove the changing indexes.
for i, idx := range tblInfo.Indices {
for _, cIdx := range changingIdxs {
if cIdx.ID == idx.ID {
tblInfo.Indices[i] = nil
break
}
}
}
tmp := tblInfo.Indices[:0]
for _, idx := range tblInfo.Indices {
if idx != nil {
tmp = append(tmp, idx)
}
}
tblInfo.Indices = tmp
// Replace the old indexes with changing indexes.
for _, cIdx := range changingIdxs {
// The index name should be changed from '_Idx$_name' to 'name'.
idxName := getChangingIndexOriginName(cIdx)
for i, idx := range tblInfo.Indices {
if strings.EqualFold(idxName, idx.Name.O) {
cIdx.Name = model.NewCIStr(idxName)
tblInfo.Indices[i] = cIdx
break
}
}
}
}

// updateNewIdxColsNameOffset updates the name&offset of the index column.
func updateNewIdxColsNameOffset(changingIdxs []*model.IndexInfo,
oldName model.CIStr, changingCol *model.ColumnInfo) {
for _, idx := range changingIdxs {
for _, col := range idx.Columns {
if col.Name.L == oldName.L {
SetIdxColNameOffset(col, changingCol)
}
}
}
}

func updateFKInfoWhenModifyColumn(tblInfo *model.TableInfo, oldCol, newCol model.CIStr) {
if oldCol.L == newCol.L {
return
}
for _, fk := range tblInfo.ForeignKeys {
for i := range fk.Cols {
if fk.Cols[i].L == oldCol.L {
fk.Cols[i] = newCol
}
}
}
}

func updateTTLInfoWhenModifyColumn(tblInfo *model.TableInfo, oldCol, newCol model.CIStr) {
if oldCol.L == newCol.L {
return
}
if tblInfo.TTLInfo != nil {
if tblInfo.TTLInfo.ColumnName.L == oldCol.L {
tblInfo.TTLInfo.ColumnName = newCol
}
}
}

// filterIndexesToRemove filters out the indexes that can be removed.
func filterIndexesToRemove(changingIdxs []*model.IndexInfo, colName model.CIStr, tblInfo *model.TableInfo) []*model.IndexInfo {
indexesToRemove := make([]*model.IndexInfo, 0, len(changingIdxs))
for _, idx := range changingIdxs {
var hasOtherChangingCol bool
for _, col := range idx.Columns {
if col.Name.L == colName.L {
continue // ignore the current modifying column.
}
if !hasOtherChangingCol {
hasOtherChangingCol = tblInfo.Columns[col.Offset].ChangeStateInfo != nil
}
}
// For the indexes that still contains other changing column, skip removing it now.
// We leave the removal work to the last modify column job.
if !hasOtherChangingCol {
indexesToRemove = append(indexesToRemove, idx)
}
}
return indexesToRemove
}

func updateChangingCol(col *model.ColumnInfo, newName model.CIStr, newOffset int) *model.ColumnInfo {
col.Name = newName
col.ChangeStateInfo = nil
col.Offset = newOffset
// After changing the column, the column's type is change, so it needs to set OriginDefaultValue back
// so that there is no error in getting the default value from OriginDefaultValue.
// Besides, nil data that was not backfilled in the "add column" is backfilled after the column is changed.
// So it can set OriginDefaultValue to nil.
col.OriginDefaultValue = nil
return col
}

func buildRelatedIndexInfos(tblInfo *model.TableInfo, colID int64) []*model.IndexInfo {
var indexInfos []*model.IndexInfo
for _, idx := range tblInfo.Indices {
if idx.HasColumnInIndexColumns(tblInfo, colID) {
indexInfos = append(indexInfos, idx)
}
}
return indexInfos
}

func buildRelatedIndexIDs(tblInfo *model.TableInfo, colID int64) []int64 {
var oldIdxIDs []int64
for _, idx := range tblInfo.Indices {
if idx.HasColumnInIndexColumns(tblInfo, colID) {
oldIdxIDs = append(oldIdxIDs, idx.ID)
}
}
return oldIdxIDs
}

// LocateOffsetToMove returns the offset of the column to move.
func LocateOffsetToMove(currentOffset int, pos *ast.ColumnPosition, tblInfo *model.TableInfo) (destOffset int, err error) {
if pos == nil {
return currentOffset, nil
}
// Get column offset.
switch pos.Tp {
case ast.ColumnPositionFirst:
return 0, nil
case ast.ColumnPositionAfter:
c := model.FindColumnInfo(tblInfo.Columns, pos.RelativeColumn.Name.L)
if c == nil || c.State != model.StatePublic {
return 0, infoschema.ErrColumnNotExists.GenWithStackByArgs(pos.RelativeColumn, tblInfo.Name)
}
if currentOffset <= c.Offset {
return c.Offset, nil
}
return c.Offset + 1, nil
case ast.ColumnPositionNone:
return currentOffset, nil
default:
return 0, errors.Errorf("unknown column position type")
}
}

>>>>>>> b73eb4bf4c (ddl: fix unexpect fail when create expression index (#39822))
// BuildElements is exported for testing.
func BuildElements(changingCol *model.ColumnInfo, changingIdxs []*model.IndexInfo) []*meta.Element {
elements := make([]*meta.Element, 0, len(changingIdxs)+1)
Expand Down
8 changes: 8 additions & 0 deletions ddl/db_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1923,6 +1923,14 @@ func (s *serialTestStateChangeSuite) TestCreateExpressionIndex(c *C) {
c.Assert(checkErr, IsNil)
tk.MustExec("admin check table t")
tk.MustQuery("select * from t order by a, b").Check(testkit.Rows("0 9", "0 11", "0 11", "1 7", "2 7", "5 7", "8 8", "10 10", "10 10"))

// https://github.com/pingcap/tidb/issues/39784
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(name varchar(20))")
tk.MustExec("insert into t values ('Abc'), ('Bcd'), ('abc')")
tk.MustExec("create index idx on test.t((lower(test.t.name)))")
tk.MustExec("admin check table t")
}

func (s *serialTestStateChangeSuite) TestCreateUniqueExpressionIndex(c *C) {
Expand Down
Loading

0 comments on commit a0dfc56

Please sign in to comment.