Skip to content

Commit

Permalink
ddl: remove onDropColumns and onAddColumns (#35862)
Browse files Browse the repository at this point in the history
ref #14766
  • Loading branch information
tangenta authored Jun 30, 2022
1 parent 16df194 commit bc00ddb
Show file tree
Hide file tree
Showing 12 changed files with 7 additions and 536 deletions.
276 changes: 0 additions & 276 deletions ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,50 +250,6 @@ func onAddColumn(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error)
return ver, errors.Trace(err)
}

func checkAddColumns(t *meta.Meta, job *model.Job) (*model.TableInfo, []*model.ColumnInfo, []*model.ColumnInfo, []*ast.ColumnPosition, []int, []bool, error) {
schemaID := job.SchemaID
tblInfo, err := GetTableInfoAndCancelFaultJob(t, job, schemaID)
if err != nil {
return nil, nil, nil, nil, nil, nil, errors.Trace(err)
}
columns := []*model.ColumnInfo{}
positions := []*ast.ColumnPosition{}
offsets := []int{}
ifNotExists := []bool{}
err = job.DecodeArgs(&columns, &positions, &offsets, &ifNotExists)
if err != nil {
job.State = model.JobStateCancelled
return nil, nil, nil, nil, nil, nil, errors.Trace(err)
}

columnInfos := make([]*model.ColumnInfo, 0, len(columns))
newColumns := make([]*model.ColumnInfo, 0, len(columns))
newPositions := make([]*ast.ColumnPosition, 0, len(columns))
newOffsets := make([]int, 0, len(columns))
newIfNotExists := make([]bool, 0, len(columns))
for i, col := range columns {
columnInfo := model.FindColumnInfo(tblInfo.Columns, col.Name.L)
if columnInfo != nil {
if columnInfo.State == model.StatePublic {
// We already have a column with the same column name.
if ifNotExists[i] {
// TODO: Should return a warning.
logutil.BgLogger().Warn("[ddl] check add columns, duplicate column", zap.Stringer("col", col.Name))
continue
}
job.State = model.JobStateCancelled
return nil, nil, nil, nil, nil, nil, infoschema.ErrColumnExists.GenWithStackByArgs(col.Name)
}
columnInfos = append(columnInfos, columnInfo)
}
newColumns = append(newColumns, columns[i])
newPositions = append(newPositions, positions[i])
newOffsets = append(newOffsets, offsets[i])
newIfNotExists = append(newIfNotExists, ifNotExists[i])
}
return tblInfo, columnInfos, newColumns, newPositions, newOffsets, newIfNotExists, nil
}

func setColumnsState(columnInfos []*model.ColumnInfo, state model.SchemaState) {
for i := range columnInfos {
columnInfos[i].State = state
Expand All @@ -318,238 +274,6 @@ func setIndicesState(indexInfos []*model.IndexInfo, state model.SchemaState) {
}
}

func onAddColumns(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error) {
// Handle the rolling back job.
if job.IsRollingback() {
ver, err = onDropColumns(d, t, job)
if err != nil {
return ver, errors.Trace(err)
}
return ver, nil
}

failpoint.Inject("errorBeforeDecodeArgs", func(val failpoint.Value) {
if val.(bool) {
failpoint.Return(ver, errors.New("occur an error before decode args"))
}
})

tblInfo, columnInfos, columns, positions, offsets, ifNotExists, err := checkAddColumns(t, job)
if err != nil {
return ver, errors.Trace(err)
}
if len(columnInfos) == 0 {
if len(columns) == 0 {
job.State = model.JobStateCancelled
return ver, nil
}
for i := range columns {
columnInfo, pos, offset, err := createColumnInfoWithPosCheck(tblInfo, columns[i], positions[i])
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
logutil.BgLogger().Info("[ddl] run add columns job", zap.String("job", job.String()), zap.Reflect("columnInfo", *columnInfo), zap.Int("offset", offset))
positions[i] = pos
offsets[i] = offset
if err = checkAddColumnTooManyColumns(len(tblInfo.Columns)); err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
columnInfos = append(columnInfos, columnInfo)
}
// Set arg to job.
job.Args = []interface{}{columnInfos, positions, offsets, ifNotExists}
}

originalState := columnInfos[0].State
switch columnInfos[0].State {
case model.StateNone:
// none -> delete only
setColumnsState(columnInfos, model.StateDeleteOnly)
ver, err = updateVersionAndTableInfoWithCheck(d, t, job, tblInfo, originalState != columnInfos[0].State)
if err != nil {
return ver, errors.Trace(err)
}
job.SchemaState = model.StateDeleteOnly
case model.StateDeleteOnly:
// delete only -> write only
setColumnsState(columnInfos, model.StateWriteOnly)
ver, err = updateVersionAndTableInfo(d, t, job, tblInfo, originalState != columnInfos[0].State)
if err != nil {
return ver, errors.Trace(err)
}
job.SchemaState = model.StateWriteOnly
case model.StateWriteOnly:
// write only -> reorganization
setColumnsState(columnInfos, model.StateWriteReorganization)
ver, err = updateVersionAndTableInfo(d, t, job, tblInfo, originalState != columnInfos[0].State)
if err != nil {
return ver, errors.Trace(err)
}
job.SchemaState = model.StateWriteReorganization
case model.StateWriteReorganization:
// reorganization -> public
// Adjust table column offsets.
oldCols := tblInfo.Columns[:len(tblInfo.Columns)-len(offsets)]
newCols := tblInfo.Columns[len(tblInfo.Columns)-len(offsets):]
tblInfo.Columns = oldCols
for i := range offsets {
// For multiple columns with after position, should adjust offsets.
// e.g. create table t(a int);
// alter table t add column b int after a, add column c int after a;
// alter table t add column a1 int after a, add column b1 int after b, add column c1 int after c;
// alter table t add column a1 int after a, add column b1 int first;
if positions[i].Tp == ast.ColumnPositionAfter {
for j := 0; j < i; j++ {
if (positions[j].Tp == ast.ColumnPositionAfter && offsets[j] < offsets[i]) || positions[j].Tp == ast.ColumnPositionFirst {
offsets[i]++
}
}
}
tblInfo.Columns = append(tblInfo.Columns, newCols[i])
tblInfo.MoveColumnInfo(len(tblInfo.Columns)-1, offsets[i])
}
setColumnsState(columnInfos, model.StatePublic)
ver, err = updateVersionAndTableInfo(d, t, job, tblInfo, originalState != columnInfos[0].State)
if err != nil {
return ver, errors.Trace(err)
}
// Finish this job.
job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo)
asyncNotifyEvent(d, &ddlutil.Event{Tp: model.ActionAddColumns, TableInfo: tblInfo, ColumnInfos: columnInfos})
default:
err = dbterror.ErrInvalidDDLState.GenWithStackByArgs("column", columnInfos[0].State)
}

return ver, errors.Trace(err)
}

func onDropColumns(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error) {
tblInfo, colInfos, delCount, idxInfos, err := checkDropColumns(t, job)
if err != nil {
return ver, errors.Trace(err)
}
if len(colInfos) == 0 {
job.State = model.JobStateCancelled
return ver, nil
}

originalState := colInfos[0].State
switch colInfos[0].State {
case model.StatePublic:
// public -> write only
setColumnsState(colInfos, model.StateWriteOnly)
setIndicesState(idxInfos, model.StateWriteOnly)
for _, colInfo := range colInfos {
err = checkDropColumnForStatePublic(tblInfo, colInfo)
if err != nil {
return ver, errors.Trace(err)
}
}
ver, err = updateVersionAndTableInfoWithCheck(d, t, job, tblInfo, originalState != colInfos[0].State)
if err != nil {
return ver, errors.Trace(err)
}
job.SchemaState = model.StateWriteOnly
case model.StateWriteOnly:
// write only -> delete only
setColumnsState(colInfos, model.StateDeleteOnly)
if len(idxInfos) > 0 {
newIndices := make([]*model.IndexInfo, 0, len(tblInfo.Indices))
for _, idx := range tblInfo.Indices {
if !indexInfoContains(idx.ID, idxInfos) {
newIndices = append(newIndices, idx)
}
}
tblInfo.Indices = newIndices
}
ver, err = updateVersionAndTableInfo(d, t, job, tblInfo, originalState != colInfos[0].State)
if err != nil {
return ver, errors.Trace(err)
}
job.Args = append(job.Args, indexInfosToIDList(idxInfos))
job.SchemaState = model.StateDeleteOnly
case model.StateDeleteOnly:
// delete only -> reorganization
setColumnsState(colInfos, model.StateDeleteReorganization)
ver, err = updateVersionAndTableInfo(d, t, job, tblInfo, originalState != colInfos[0].State)
if err != nil {
return ver, errors.Trace(err)
}
job.SchemaState = model.StateDeleteReorganization
case model.StateDeleteReorganization:
// reorganization -> absent
// All reorganization jobs are done, drop this column.
tblInfo.Columns = tblInfo.Columns[:len(tblInfo.Columns)-delCount]
setColumnsState(colInfos, model.StateNone)
ver, err = updateVersionAndTableInfo(d, t, job, tblInfo, originalState != colInfos[0].State)
if err != nil {
return ver, errors.Trace(err)
}

// Finish this job.
if job.IsRollingback() {
job.FinishTableJob(model.JobStateRollbackDone, model.StateNone, ver, tblInfo)
} else {
job.FinishTableJob(model.JobStateDone, model.StateNone, ver, tblInfo)
job.Args = append(job.Args, getPartitionIDs(tblInfo))
}
default:
err = dbterror.ErrInvalidDDLJob.GenWithStackByArgs("table", tblInfo.State)
}
return ver, errors.Trace(err)
}

func checkDropColumns(t *meta.Meta, job *model.Job) (*model.TableInfo, []*model.ColumnInfo, int, []*model.IndexInfo, error) {
schemaID := job.SchemaID
tblInfo, err := GetTableInfoAndCancelFaultJob(t, job, schemaID)
if err != nil {
return nil, nil, 0, nil, errors.Trace(err)
}

var colNames []model.CIStr
var ifExists []bool
// indexIds is used to make sure we don't truncate args when decoding the rawArgs.
var indexIds []int64
err = job.DecodeArgs(&colNames, &ifExists, &indexIds)
if err != nil {
job.State = model.JobStateCancelled
return nil, nil, 0, nil, errors.Trace(err)
}

newColNames := make([]model.CIStr, 0, len(colNames))
colInfos := make([]*model.ColumnInfo, 0, len(colNames))
newIfExists := make([]bool, 0, len(colNames))
indexInfos := make([]*model.IndexInfo, 0)
for i, colName := range colNames {
colInfo := model.FindColumnInfo(tblInfo.Columns, colName.L)
if colInfo == nil || colInfo.Hidden {
if ifExists[i] {
// TODO: Should return a warning.
logutil.BgLogger().Warn(fmt.Sprintf("column %s doesn't exist", colName))
continue
}
job.State = model.JobStateCancelled
return nil, nil, 0, nil, dbterror.ErrCantDropFieldOrKey.GenWithStack("column %s doesn't exist", colName)
}
if err = isDroppableColumn(job.MultiSchemaInfo != nil, tblInfo, colName); err != nil {
job.State = model.JobStateCancelled
return nil, nil, 0, nil, errors.Trace(err)
}
newColNames = append(newColNames, colName)
newIfExists = append(newIfExists, ifExists[i])
colInfos = append(colInfos, colInfo)
idxInfos := listIndicesWithColumn(colName.L, tblInfo.Indices)
indexInfos = append(indexInfos, idxInfos...)
}
job.Args = []interface{}{newColNames, newIfExists}
if len(indexIds) > 0 {
job.Args = append(job.Args, indexIds)
}
return tblInfo, colInfos, len(colInfos), indexInfos, nil
}

func checkDropColumnForStatePublic(tblInfo *model.TableInfo, colInfo *model.ColumnInfo) (err error) {
// Set this column's offset to the last and reset all following columns' offsets.
adjustColumnInfoInDropColumn(tblInfo, colInfo.Offset)
Expand Down
2 changes: 1 addition & 1 deletion ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -818,7 +818,7 @@ func (d *ddl) DoDDLJob(ctx sessionctx.Context, job *model.Job) error {
return errors.Trace(historyJob.Error)
}
// Only for JobStateCancelled job which is adding columns or drop columns or drop indexes.
if historyJob.IsCancelled() && (historyJob.Type == model.ActionAddColumns || historyJob.Type == model.ActionDropColumns || historyJob.Type == model.ActionDropIndexes) {
if historyJob.IsCancelled() && (historyJob.Type == model.ActionDropIndexes) {
if historyJob.MultiSchemaInfo != nil && len(historyJob.MultiSchemaInfo.Warnings) != 0 {
for _, warning := range historyJob.MultiSchemaInfo.Warnings {
ctx.GetSessionVars().StmtCtx.AppendWarning(warning)
Expand Down
Loading

0 comments on commit bc00ddb

Please sign in to comment.