Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: Support the operation of adding multi-columns #15540

Merged
merged 47 commits into from
Apr 8, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
ba3ca9f
Support the operation of adding multi-columns
gauss1314 Mar 20, 2020
154668a
clean code
gauss1314 Mar 21, 2020
5818799
Merge branch 'master' into supportAddMultiColumns
gauss1314 Mar 21, 2020
eba0fee
update go.mod
gauss1314 Mar 21, 2020
7708e6e
add drop columns
gauss1314 Mar 22, 2020
15af5d4
fix offsets for multiple after positions
gauss1314 Mar 22, 2020
c7a4041
add test
gauss1314 Mar 22, 2020
5794d50
Merge branch 'master' into supportAddMultiColumns
gauss1314 Mar 22, 2020
e156feb
fix tidy
gauss1314 Mar 24, 2020
b83a160
fix check
gauss1314 Mar 24, 2020
a25b477
Merge branch 'master' into supportAddMultiColumns
gauss1314 Mar 24, 2020
244e308
extract a function to handle the duplicated code
gauss1314 Mar 24, 2020
de6964c
add test case
gauss1314 Mar 24, 2020
8521775
fix test
gauss1314 Mar 24, 2020
2426141
add drop multi-columns
gauss1314 Mar 25, 2020
c3b1469
fix offset bug and drop columns bug
gauss1314 Mar 25, 2020
d5cfbba
add rollingback test
gauss1314 Mar 25, 2020
3b0afad
add more test case
gauss1314 Mar 26, 2020
d6cb328
polish test code
gauss1314 Mar 26, 2020
1e59459
refactor insertColStats2KV
gauss1314 Mar 26, 2020
93c1e50
Merge branch 'master' into supportAddMultiColumns
gauss1314 Mar 26, 2020
5ee862b
remove useless if
gauss1314 Mar 26, 2020
c963f42
fix HandleDDLEvent bug
gauss1314 Mar 26, 2020
86b8dea
add comments and clean code
gauss1314 Mar 26, 2020
538878c
fix bug
gauss1314 Mar 26, 2020
c9b8dd0
polish code
gauss1314 Mar 26, 2020
c2bf0b2
Update ddl/ddl_api.go
gauss1314 Mar 26, 2020
b744c86
add checkIsDroppableColumn function
gauss1314 Mar 27, 2020
0f6c25a
support if_not_exists and if_exists, add integration test cases
gauss1314 Mar 27, 2020
a31407d
Merge branch 'master' into supportAddMultiColumns
gauss1314 Mar 27, 2020
32d6358
improve readability
gauss1314 Mar 27, 2020
00350ac
polish code
gauss1314 Mar 30, 2020
1db980c
polish code
gauss1314 Mar 31, 2020
1098274
Merge branch 'master' into supportAddMultiColumns
gauss1314 Mar 31, 2020
97f6933
polish code and add test cases
gauss1314 Apr 2, 2020
d53c704
remove useless code
gauss1314 Apr 2, 2020
2c6f0f5
[WIP]add test case in db_change_test.go
gauss1314 Apr 2, 2020
2783bb0
fix TestParallelDropColumns
gauss1314 Apr 3, 2020
51b5cea
fix db_change_test.go
gauss1314 Apr 7, 2020
416a8ea
Merge branch 'master' into supportAddMultiColumns
gauss1314 Apr 7, 2020
a285200
fix conflict
gauss1314 Apr 7, 2020
fedc591
Merge branch 'master' into supportAddMultiColumns
zimulala Apr 8, 2020
165de08
Update ddl/column.go
gauss1314 Apr 8, 2020
72b8867
update comments
gauss1314 Apr 8, 2020
9d9dd11
Merge branch 'master' into supportAddMultiColumns
gauss1314 Apr 8, 2020
ab46275
Merge branch 'master' into supportAddMultiColumns
zimulala Apr 8, 2020
d6696f1
Merge branch 'master' into supportAddMultiColumns
zimulala Apr 8, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
311 changes: 283 additions & 28 deletions ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,22 +97,25 @@ func adjustColumnInfoInDropColumn(tblInfo *model.TableInfo, offset int) {
tblInfo.Columns = newCols
}

func createColumnInfo(tblInfo *model.TableInfo, colInfo *model.ColumnInfo, pos *ast.ColumnPosition) (*model.ColumnInfo, int, error) {
func createColumnInfo(tblInfo *model.TableInfo, colInfo *model.ColumnInfo, pos *ast.ColumnPosition) (*model.ColumnInfo, *ast.ColumnPosition, int, error) {
// Check column name duplicate.
cols := tblInfo.Columns
position := len(cols)

// Get column position.
offset := len(cols)
// Should initialize pos when it is nil.
if pos == nil {
pos = &ast.ColumnPosition{}
}
// Get column offset.
if pos.Tp == ast.ColumnPositionFirst {
position = 0
offset = 0
} else if pos.Tp == ast.ColumnPositionAfter {
c := model.FindColumnInfo(cols, pos.RelativeColumn.Name.L)
if c == nil {
return nil, 0, infoschema.ErrColumnNotExists.GenWithStackByArgs(pos.RelativeColumn, tblInfo.Name)
return nil, pos, 0, infoschema.ErrColumnNotExists.GenWithStackByArgs(pos.RelativeColumn, tblInfo.Name)
}

// Insert position is after the mentioned column.
position = c.Offset + 1
// Insert offset is after the mentioned column.
offset = c.Offset + 1
}
colInfo.ID = allocateColumnID(tblInfo)
colInfo.State = model.StateNone
Expand All @@ -121,13 +124,13 @@ func createColumnInfo(tblInfo *model.TableInfo, colInfo *model.ColumnInfo, pos *
colInfo.Offset = len(cols)

// Append the column info to the end of the tblInfo.Columns.
// It will reorder to the right position in "Columns" when it state change to public.
// It will reorder to the right offset in "Columns" when it state change to public.
newCols := make([]*model.ColumnInfo, 0, len(cols)+1)
newCols = append(newCols, cols...)
newCols = append(newCols, colInfo)

tblInfo.Columns = newCols
return colInfo, position, nil
return colInfo, pos, offset, nil
}

func checkAddColumn(t *meta.Meta, job *model.Job) (*model.TableInfo, *model.ColumnInfo, *model.ColumnInfo, *ast.ColumnPosition, int, error) {
Expand Down Expand Up @@ -177,7 +180,7 @@ func onAddColumn(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error)
return ver, errors.Trace(err)
}
if columnInfo == nil {
columnInfo, offset, err = createColumnInfo(tblInfo, col, pos)
columnInfo, _, offset, err = createColumnInfo(tblInfo, col, pos)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
Expand Down Expand Up @@ -222,14 +225,279 @@ func onAddColumn(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error)

// Finish this job.
job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo)
asyncNotifyEvent(d, &util.Event{Tp: model.ActionAddColumn, TableInfo: tblInfo, ColumnInfo: columnInfo})
asyncNotifyEvent(d, &util.Event{Tp: model.ActionAddColumn, TableInfo: tblInfo, ColumnInfos: []*model.ColumnInfo{columnInfo}})
default:
err = ErrInvalidDDLState.GenWithStackByArgs("column", columnInfo.State)
}

return ver, errors.Trace(err)
}

func checkAddColumns(t *meta.Meta, job *model.Job) (*model.TableInfo, []*model.ColumnInfo, []*model.ColumnInfo, []*ast.ColumnPosition, []int, []bool, error) {
schemaID := job.SchemaID
tblInfo, err := getTableInfoAndCancelFaultJob(t, job, schemaID)
if err != nil {
return nil, nil, nil, nil, nil, nil, errors.Trace(err)
}
columns := []*model.ColumnInfo{}
positions := []*ast.ColumnPosition{}
offsets := []int{}
ifNotExists := []bool{}
err = job.DecodeArgs(&columns, &positions, &offsets, &ifNotExists)
if err != nil {
gauss1314 marked this conversation as resolved.
Show resolved Hide resolved
job.State = model.JobStateCancelled
return nil, nil, nil, nil, nil, nil, errors.Trace(err)
}
zimulala marked this conversation as resolved.
Show resolved Hide resolved

columnInfos := make([]*model.ColumnInfo, 0, len(columns))
newColumns := make([]*model.ColumnInfo, 0, len(columns))
newPositions := make([]*ast.ColumnPosition, 0, len(columns))
newOffsets := make([]int, 0, len(columns))
newIfNotExists := make([]bool, 0, len(columns))
for i, col := range columns {
columnInfo := model.FindColumnInfo(tblInfo.Columns, col.Name.L)
if columnInfo != nil {
if columnInfo.State == model.StatePublic {
// We already have a column with the same column name.
if ifNotExists[i] {
// TODO: Should return a warning.
logutil.BgLogger().Warn("[ddl] check add columns, duplicate column", zap.Stringer("col", col.Name))
continue
}
job.State = model.JobStateCancelled
return nil, nil, nil, nil, nil, nil, infoschema.ErrColumnExists.GenWithStackByArgs(col.Name)
}
columnInfos = append(columnInfos, columnInfo)
}
newColumns = append(newColumns, columns[i])
newPositions = append(newPositions, positions[i])
newOffsets = append(newOffsets, offsets[i])
newIfNotExists = append(newIfNotExists, ifNotExists[i])
}
return tblInfo, columnInfos, newColumns, newPositions, newOffsets, newIfNotExists, nil
}

func setColumnsState(columnInfos []*model.ColumnInfo, state model.SchemaState) {
for i := range columnInfos {
columnInfos[i].State = state
}
}

func onAddColumns(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error) {
// Handle the rolling back job.
if job.IsRollingback() {
ver, err = onDropColumns(t, job)
gauss1314 marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return ver, errors.Trace(err)
}
return ver, nil
}

failpoint.Inject("errorBeforeDecodeArgs", func(val failpoint.Value) {
if val.(bool) {
failpoint.Return(ver, errors.New("occur an error before decode args"))
}
})

tblInfo, columnInfos, columns, positions, offsets, ifNotExists, err := checkAddColumns(t, job)
if err != nil {
return ver, errors.Trace(err)
}
if len(columnInfos) == 0 {
if len(columns) == 0 {
job.State = model.JobStateCancelled
return ver, nil
}
for i := range columns {
columnInfo, pos, offset, err := createColumnInfo(tblInfo, columns[i], positions[i])
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
logutil.BgLogger().Info("[ddl] run add columns job", zap.String("job", job.String()), zap.Reflect("columnInfo", *columnInfo), zap.Int("offset", offset))
positions[i] = pos
offsets[i] = offset
if err = checkAddColumnTooManyColumns(len(tblInfo.Columns)); err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
columnInfos = append(columnInfos, columnInfo)
}
// Set arg to job.
job.Args = []interface{}{columnInfos, positions, offsets, ifNotExists}
}

originalState := columnInfos[0].State
switch columnInfos[0].State {
case model.StateNone:
// none -> delete only
job.SchemaState = model.StateDeleteOnly
setColumnsState(columnInfos, model.StateDeleteOnly)
ver, err = updateVersionAndTableInfoWithCheck(t, job, tblInfo, originalState != columnInfos[0].State)
case model.StateDeleteOnly:
// delete only -> write only
job.SchemaState = model.StateWriteOnly
setColumnsState(columnInfos, model.StateWriteOnly)
ver, err = updateVersionAndTableInfo(t, job, tblInfo, originalState != columnInfos[0].State)
case model.StateWriteOnly:
// write only -> reorganization
job.SchemaState = model.StateWriteReorganization
setColumnsState(columnInfos, model.StateWriteReorganization)
ver, err = updateVersionAndTableInfo(t, job, tblInfo, originalState != columnInfos[0].State)
case model.StateWriteReorganization:
// reorganization -> public
// Adjust table column offsets.
oldCols := tblInfo.Columns[:len(tblInfo.Columns)-len(offsets)]
newCols := tblInfo.Columns[len(tblInfo.Columns)-len(offsets):]
tblInfo.Columns = oldCols
for i := range offsets {
// For multiple columns with after position, should adjust offsets.
// e.g. create table t(a int);
// alter table t add column b int after a, add column c int after a;
// alter table t add column a1 int after a, add column b1 int after b, add column c1 int after c;
// alter table t add column a1 int after a, add column b1 int first;
if positions[i].Tp == ast.ColumnPositionAfter {
gauss1314 marked this conversation as resolved.
Show resolved Hide resolved
for j := 0; j < i; j++ {
if (positions[j].Tp == ast.ColumnPositionAfter && offsets[j] < offsets[i]) || positions[j].Tp == ast.ColumnPositionFirst {
offsets[i]++
}
}
}
tblInfo.Columns = append(tblInfo.Columns, newCols[i])
adjustColumnInfoInAddColumn(tblInfo, offsets[i])
}
setColumnsState(columnInfos, model.StatePublic)
ver, err = updateVersionAndTableInfo(t, job, tblInfo, originalState != columnInfos[0].State)
if err != nil {
return ver, errors.Trace(err)
}
// Finish this job.
job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo)
asyncNotifyEvent(d, &util.Event{Tp: model.ActionAddColumns, TableInfo: tblInfo, ColumnInfos: columnInfos})
default:
err = ErrInvalidDDLState.GenWithStackByArgs("column", columnInfos[0].State)
}

return ver, errors.Trace(err)
}

func onDropColumns(t *meta.Meta, job *model.Job) (ver int64, _ error) {
tblInfo, colInfos, delCount, err := checkDropColumns(t, job)
if err != nil {
return ver, errors.Trace(err)
}
if len(colInfos) == 0 {
job.State = model.JobStateCancelled
return ver, nil
}

originalState := colInfos[0].State
switch colInfos[0].State {
case model.StatePublic:
// public -> write only
job.SchemaState = model.StateWriteOnly
setColumnsState(colInfos, model.StateWriteOnly)
for _, colInfo := range colInfos {
err = checkDropColumnForStatePublic(tblInfo, colInfo)
if err != nil {
return ver, errors.Trace(err)
}
}
ver, err = updateVersionAndTableInfoWithCheck(t, job, tblInfo, originalState != colInfos[0].State)
case model.StateWriteOnly:
// write only -> delete only
job.SchemaState = model.StateDeleteOnly
setColumnsState(colInfos, model.StateDeleteOnly)
ver, err = updateVersionAndTableInfo(t, job, tblInfo, originalState != colInfos[0].State)
case model.StateDeleteOnly:
// delete only -> reorganization
job.SchemaState = model.StateDeleteReorganization
setColumnsState(colInfos, model.StateDeleteReorganization)
ver, err = updateVersionAndTableInfo(t, job, tblInfo, originalState != colInfos[0].State)
case model.StateDeleteReorganization:
// reorganization -> absent
// All reorganization jobs are done, drop this column.
tblInfo.Columns = tblInfo.Columns[:len(tblInfo.Columns)-delCount]
setColumnsState(colInfos, model.StateNone)
ver, err = updateVersionAndTableInfo(t, job, tblInfo, originalState != colInfos[0].State)
if err != nil {
return ver, errors.Trace(err)
}

// Finish this job.
if job.IsRollingback() {
job.FinishTableJob(model.JobStateRollbackDone, model.StateNone, ver, tblInfo)
} else {
job.FinishTableJob(model.JobStateDone, model.StateNone, ver, tblInfo)
}
default:
err = errInvalidDDLJob.GenWithStackByArgs("table", tblInfo.State)
}
return ver, errors.Trace(err)
}

func checkDropColumns(t *meta.Meta, job *model.Job) (*model.TableInfo, []*model.ColumnInfo, int, error) {
schemaID := job.SchemaID
tblInfo, err := getTableInfoAndCancelFaultJob(t, job, schemaID)
if err != nil {
return nil, nil, 0, errors.Trace(err)
}

var colNames []model.CIStr
var ifExists []bool
err = job.DecodeArgs(&colNames, &ifExists)
if err != nil {
job.State = model.JobStateCancelled
return nil, nil, 0, errors.Trace(err)
}

newColNames := make([]model.CIStr, 0, len(colNames))
colInfos := make([]*model.ColumnInfo, 0, len(colNames))
newIfExists := make([]bool, 0, len(colNames))
for i, colName := range colNames {
colInfo := model.FindColumnInfo(tblInfo.Columns, colName.L)
if colInfo == nil || colInfo.Hidden {
if ifExists[i] {
// TODO: Should return a warning.
logutil.BgLogger().Warn(fmt.Sprintf("column %s doesn't exist", colName))
continue
}
job.State = model.JobStateCancelled
return nil, nil, 0, ErrCantDropFieldOrKey.GenWithStack("column %s doesn't exist", colName)
}
if err = isDroppableColumn(tblInfo, colName); err != nil {
job.State = model.JobStateCancelled
return nil, nil, 0, errors.Trace(err)
}
newColNames = append(newColNames, colName)
newIfExists = append(newIfExists, ifExists[i])
colInfos = append(colInfos, colInfo)
}
job.Args = []interface{}{newColNames, newIfExists}
return tblInfo, colInfos, len(colInfos), nil
}

func checkDropColumnForStatePublic(tblInfo *model.TableInfo, colInfo *model.ColumnInfo) (err error) {
// Set this column's offset to the last and reset all following columns' offsets.
adjustColumnInfoInDropColumn(tblInfo, colInfo.Offset)
// When the dropping column has not-null flag and it hasn't the default value, we can backfill the column value like "add column".
// NOTE: If the state of StateWriteOnly can be rollbacked, we'd better reconsider the original default value.
// And we need consider the column without not-null flag.
if colInfo.OriginDefaultValue == nil && mysql.HasNotNullFlag(colInfo.Flag) {
// If the column is timestamp default current_timestamp, and DDL owner is new version TiDB that set column.Version to 1,
// then old TiDB update record in the column write only stage will uses the wrong default value of the dropping column.
// Because new version of the column default value is UTC time, but old version TiDB will think the default value is the time in system timezone.
// But currently will be ok, because we can't cancel the drop column job when the job is running,
// so the column will be dropped succeed and client will never see the wrong default value of the dropped column.
// More info about this problem, see PR#9115.
colInfo.OriginDefaultValue, err = generateOriginDefaultValue(colInfo)
if err != nil {
return err
}
}
return nil
}

func onDropColumn(t *meta.Meta, job *model.Job) (ver int64, _ error) {
tblInfo, colInfo, err := checkDropColumn(t, job)
if err != nil {
Expand All @@ -242,22 +510,9 @@ func onDropColumn(t *meta.Meta, job *model.Job) (ver int64, _ error) {
// public -> write only
job.SchemaState = model.StateWriteOnly
colInfo.State = model.StateWriteOnly
// Set this column's offset to the last and reset all following columns' offsets.
adjustColumnInfoInDropColumn(tblInfo, colInfo.Offset)
// When the dropping column has not-null flag and it hasn't the default value, we can backfill the column value like "add column".
// NOTE: If the state of StateWriteOnly can be rollbacked, we'd better reconsider the original default value.
// And we need consider the column without not-null flag.
if colInfo.OriginDefaultValue == nil && mysql.HasNotNullFlag(colInfo.Flag) {
// If the column is timestamp default current_timestamp, and DDL owner is new version TiDB that set column.Version to 1,
// then old TiDB update record in the column write only stage will uses the wrong default value of the dropping column.
// Because new version of the column default value is UTC time, but old version TiDB will think the default value is the time in system timezone.
// But currently will be ok, because we can't cancel the drop column job when the job is running,
// so the column will be dropped succeed and client will never see the wrong default value of the dropped column.
// More info about this problem, see PR#9115.
colInfo.OriginDefaultValue, err = generateOriginDefaultValue(colInfo)
if err != nil {
return ver, errors.Trace(err)
}
err = checkDropColumnForStatePublic(tblInfo, colInfo)
if err != nil {
return ver, errors.Trace(err)
}
ver, err = updateVersionAndTableInfoWithCheck(t, job, tblInfo, originalState != colInfo.State)
case model.StateWriteOnly:
Expand Down
Loading