Skip to content

Commit

Permalink
ddl: add check for max index key length when alter modify/change colu…
Browse files Browse the repository at this point in the history
…mn (pingcap#11220)

Signed-off-by: crazycs520 <[email protected]>
  • Loading branch information
crazycs520 committed Apr 9, 2020
1 parent c21ca8f commit c3fcfb6
Show file tree
Hide file tree
Showing 3 changed files with 150 additions and 58 deletions.
15 changes: 15 additions & 0 deletions ddl/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1894,6 +1894,21 @@ func (s *testDBSuite) TestAlterColumn(c *C) {
c.Assert(createSQL, Equals, expected)
_, err = s.tk.Exec("alter table mc modify column a bigint auto_increment") // Adds auto_increment should throw error
c.Assert(err, NotNil)

s.tk.MustExec("drop table if exists t")
// TODO: fix me, below sql should execute successfully. Currently, the result of calculate key length is wrong.
//s.tk.MustExec("create table t1 (a varchar(10),b varchar(100),c tinyint,d varchar(3071),index(a),index(a,b),index (c,d));")
s.tk.MustExec("create table t1 (a varchar(10),b varchar(100),c tinyint,d varchar(3068),index(a),index(a,b),index (c,d));")
_, err = s.tk.Exec("alter table t1 modify column a varchar(3000);")
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "[ddl:1071]Specified key was too long; max key length is 3072 bytes")
// check modify column with rename column.
_, err = s.tk.Exec("alter table t1 change column a x varchar(3000);")
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "[ddl:1071]Specified key was too long; max key length is 3072 bytes")
_, err = s.tk.Exec("alter table t1 modify column c bigint;")
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "[ddl:1071]Specified key was too long; max key length is 3072 bytes")
}

func (s *testDBSuite) mustExec(c *C, query string, args ...interface{}) {
Expand Down
41 changes: 41 additions & 0 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -2161,6 +2161,10 @@ func (d *ddl) getModifiableColumnJob(ctx sessionctx.Context, ident ast.Ident, or
return nil, errors.Trace(err)
}

if err = checkColumnWithIndexConstraint(t.Meta(), col.ColumnInfo, newCol.ColumnInfo); err != nil {
return nil, err
}

// As same with MySQL, we don't support modifying the stored status for generated columns.
if err = checkModifyGeneratedColumn(t.Cols(), col, newCol); err != nil {
return nil, errors.Trace(err)
Expand All @@ -2176,6 +2180,43 @@ func (d *ddl) getModifiableColumnJob(ctx sessionctx.Context, ident ast.Ident, or
return job, nil
}

// checkColumnWithIndexConstraint is used to check the related index constraint of the modified column.
// Index has a max-prefix-length constraint. eg: a varchar(100), index idx(a), modifying column a to a varchar(4000)
// will cause index idx to break the max-prefix-length constraint.
func checkColumnWithIndexConstraint(tbInfo *model.TableInfo, originalCol, newCol *model.ColumnInfo) error {
var columns []*model.ColumnInfo
for _, indexInfo := range tbInfo.Indices {
containColumn := false
for _, col := range indexInfo.Columns {
if col.Name.L == originalCol.Name.L {
containColumn = true
break
}
}
if containColumn == false {
continue
}
if columns == nil {
columns = make([]*model.ColumnInfo, 0, len(tbInfo.Columns))
columns = append(columns, tbInfo.Columns...)
// replace old column with new column.
for i, col := range columns {
if col.Name.L != originalCol.Name.L {
continue
}
columns[i] = newCol.Clone()
columns[i].Name = originalCol.Name
break
}
}
err := checkIndexPrefixLength(columns, indexInfo.Columns)
if err != nil {
return err
}
}
return nil
}

// ChangeColumn renames an existing column and modifies the column's definition,
// currently we only support limited kind of changes
// that do not need to change or check data on the table.
Expand Down
152 changes: 94 additions & 58 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,71 +57,20 @@ func buildIndexColumns(columns []*model.ColumnInfo, idxColNames []*ast.IndexColN
sumLength := 0

for _, ic := range idxColNames {
col := model.FindColumnInfo(columns, ic.Column.Name.O)
col := model.FindColumnInfo(columns, ic.Column.Name.L)
if col == nil {
return nil, errKeyColumnDoesNotExits.GenWithStack("column does not exist: %s", ic.Column.Name)
}

if col.Flen == 0 && (types.IsTypeChar(col.FieldType.Tp) || types.IsTypeVarchar(col.FieldType.Tp)) {
return nil, errors.Trace(errWrongKeyColumn.GenWithStackByArgs(ic.Column.Name))
if err := checkIndexColumn(col, ic); err != nil {
return nil, err
}

// JSON column cannot index.
if col.FieldType.Tp == mysql.TypeJSON {
return nil, errors.Trace(errJSONUsedAsKey.GenWithStackByArgs(col.Name.O))
}

// Length must be specified for BLOB and TEXT column indexes.
if types.IsTypeBlob(col.FieldType.Tp) && ic.Length == types.UnspecifiedLength {
return nil, errors.Trace(errBlobKeyWithoutLength.GenWithStackByArgs(col.Name.O))
}

// Length can only be specified for specifiable types.
if ic.Length != types.UnspecifiedLength && !types.IsTypePrefixable(col.FieldType.Tp) {
return nil, errors.Trace(errIncorrectPrefixKey)
}

// Key length must be shorter or equal to the column length.
if ic.Length != types.UnspecifiedLength &&
types.IsTypeChar(col.FieldType.Tp) && col.Flen < ic.Length {
return nil, errors.Trace(errIncorrectPrefixKey)
}

// Specified length must be shorter than the max length for prefix.
if ic.Length > maxPrefixLength {
return nil, errors.Trace(errTooLongKey)
}

// Take care of the sum of length of all index columns.
if ic.Length != types.UnspecifiedLength {
sumLength += ic.Length
} else {
// Specified data types.
if col.Flen != types.UnspecifiedLength {
// Special case for the bit type.
if col.FieldType.Tp == mysql.TypeBit {
sumLength += (col.Flen + 7) >> 3
} else {
sumLength += col.Flen
}
} else {
if length, ok := mysql.DefaultLengthOfMysqlTypes[col.FieldType.Tp]; ok {
sumLength += length
} else {
return nil, errUnknownTypeLength.GenWithStackByArgs(col.FieldType.Tp)
}

// Special case for time fraction.
if types.IsTypeFractionable(col.FieldType.Tp) &&
col.FieldType.Decimal != types.UnspecifiedLength {
if length, ok := mysql.DefaultLengthOfTimeFraction[col.FieldType.Decimal]; ok {
sumLength += length
} else {
return nil, errUnknownFractionLength.GenWithStackByArgs(col.FieldType.Tp, col.FieldType.Decimal)
}
}
}
indexColumnLength, err := getIndexColumnLength(col, ic.Length)
if err != nil {
return nil, err
}
sumLength += indexColumnLength

// The sum of all lengths must be shorter than the max length for prefix.
if sumLength > maxPrefixLength {
Expand All @@ -138,6 +87,93 @@ func buildIndexColumns(columns []*model.ColumnInfo, idxColNames []*ast.IndexColN
return idxColumns, nil
}

func checkIndexPrefixLength(columns []*model.ColumnInfo, idxColumns []*model.IndexColumn) error {
// The sum of length of all index columns.
sumLength := 0
for _, ic := range idxColumns {
col := model.FindColumnInfo(columns, ic.Name.L)
if col == nil {
return errKeyColumnDoesNotExits.GenWithStack("column does not exist: %s", ic.Name)
}

indexColumnLength, err := getIndexColumnLength(col, ic.Length)
if err != nil {
return err
}
sumLength += indexColumnLength
// The sum of all lengths must be shorter than the max length for prefix.
if sumLength > maxPrefixLength {
return errors.Trace(errTooLongKey)
}
}
return nil
}

func checkIndexColumn(col *model.ColumnInfo, ic *ast.IndexColName) error {
if col.Flen == 0 && (types.IsTypeChar(col.FieldType.Tp) || types.IsTypeVarchar(col.FieldType.Tp)) {
return errors.Trace(errWrongKeyColumn.GenWithStackByArgs(ic.Column.Name))
}

// JSON column cannot index.
if col.FieldType.Tp == mysql.TypeJSON {
return errors.Trace(errJSONUsedAsKey.GenWithStackByArgs(col.Name.O))
}

// Length must be specified for BLOB and TEXT column indexes.
if types.IsTypeBlob(col.FieldType.Tp) && ic.Length == types.UnspecifiedLength {
return errors.Trace(errBlobKeyWithoutLength)
}

// Length can only be specified for specifiable types.
if ic.Length != types.UnspecifiedLength && !types.IsTypePrefixable(col.FieldType.Tp) {
return errors.Trace(errIncorrectPrefixKey)
}

// Key length must be shorter or equal to the column length.
if ic.Length != types.UnspecifiedLength &&
types.IsTypeChar(col.FieldType.Tp) && col.Flen < ic.Length {
return errors.Trace(errIncorrectPrefixKey)
}

// Specified length must be shorter than the max length for prefix.
if ic.Length > maxPrefixLength {
return errors.Trace(errTooLongKey)
}
return nil
}

func getIndexColumnLength(col *model.ColumnInfo, colLen int) (int, error) {
// Take care of the sum of length of all index columns.
if colLen != types.UnspecifiedLength {
return colLen, nil
}
// Specified data types.
if col.Flen != types.UnspecifiedLength {
// Special case for the bit type.
if col.FieldType.Tp == mysql.TypeBit {
return (col.Flen + 7) >> 3, nil
}
return col.Flen, nil

}

length, ok := mysql.DefaultLengthOfMysqlTypes[col.FieldType.Tp]
if !ok {
return length, errUnknownTypeLength.GenWithStackByArgs(col.FieldType.Tp)
}

// Special case for time fraction.
if types.IsTypeFractionable(col.FieldType.Tp) &&
col.FieldType.Decimal != types.UnspecifiedLength {
decimalLength, ok := mysql.DefaultLengthOfTimeFraction[col.FieldType.Decimal]
if !ok {
return length, errUnknownFractionLength.GenWithStackByArgs(col.FieldType.Tp, col.FieldType.Decimal)
}
length += decimalLength
}
return length, nil
}

func buildIndexInfo(tblInfo *model.TableInfo, indexName model.CIStr, idxColNames []*ast.IndexColName, state model.SchemaState) (*model.IndexInfo, error) {
if err := checkTooLongIndex(indexName); err != nil {
return nil, errors.Trace(err)
Expand Down

0 comments on commit c3fcfb6

Please sign in to comment.