Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DDL: fix a bug in column charset and collate when create table and modify column (#11300) #11492

Merged
merged 4 commits into from
Jul 29, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 68 additions & 11 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,15 +295,34 @@ func typesNeedCharset(tp byte) bool {
return false
}

func setCharsetCollationFlenDecimal(tp *types.FieldType, tblCharset string, dbCharset string) error {
func setCharsetCollationFlenDecimal(tp *types.FieldType, specifiedCollates []string, tblCharset string, dbCharset string) error {
tp.Charset = strings.ToLower(tp.Charset)
tp.Collate = strings.ToLower(tp.Collate)
if len(tp.Charset) == 0 {
if typesNeedCharset(tp.Tp) {
var err error
tp.Charset, tp.Collate, err = ResolveCharsetCollation(tblCharset, dbCharset)
if err != nil {
return errors.Trace(err)
if len(specifiedCollates) == 0 {
// Both the charset and collate are not specified.
var err error
tp.Charset, tp.Collate, err = ResolveCharsetCollation(tblCharset, dbCharset)
if err != nil {
return errors.Trace(err)
}
} else {
// The charset is not specified but the collate is.
// We should derive charset from it's collate specified rather than getting from table and db.
// It is handled like mysql's logic, use derived charset to judge conflict with next collate.
for _, spc := range specifiedCollates {
derivedCollation, err := charset.GetCollationByName(spc)
if err != nil {
return errors.Trace(err)
}
if len(tp.Charset) == 0 {
tp.Charset = derivedCollation.CharsetName
} else if tp.Charset != derivedCollation.CharsetName {
return ErrCollationCharsetMismatch.GenWithStackByArgs(derivedCollation.Name, tp.Charset)
}
tp.Collate = derivedCollation.Name
}
}
} else {
tp.Charset = charset.CharsetBin
Expand All @@ -314,10 +333,25 @@ func setCharsetCollationFlenDecimal(tp *types.FieldType, tblCharset string, dbCh
return errUnsupportedCharset.GenWithStackByArgs(tp.Charset, tp.Collate)
}
if len(tp.Collate) == 0 {
var err error
tp.Collate, err = charset.GetDefaultCollation(tp.Charset)
if err != nil {
return errors.Trace(err)
if len(specifiedCollates) == 0 {
// The charset is specified, but the collate is not.
var err error
tp.Collate, err = charset.GetDefaultCollation(tp.Charset)
if err != nil {
return errors.Trace(err)
}
} else {
// Both the charset and collate are specified.
for _, spc := range specifiedCollates {
derivedCollation, err := charset.GetCollationByName(spc)
if err != nil {
return errors.Trace(err)
}
if tp.Charset != derivedCollation.CharsetName {
return ErrCollationCharsetMismatch.GenWithStackByArgs(derivedCollation.Name, tp.Charset)
}
tp.Collate = derivedCollation.Name
}
}
}
}
Expand All @@ -341,7 +375,10 @@ func setCharsetCollationFlenDecimal(tp *types.FieldType, tblCharset string, dbCh
// outPriKeyConstraint is the primary key constraint out of column definition. such as: create table t1 (id int , age int, primary key(id));
func buildColumnAndConstraint(ctx sessionctx.Context, offset int,
colDef *ast.ColumnDef, outPriKeyConstraint *ast.Constraint, tblCharset, dbCharset string) (*table.Column, []*ast.Constraint, error) {
if err := setCharsetCollationFlenDecimal(colDef.Tp, tblCharset, dbCharset); err != nil {
// specifiedCollates refers to collates in colDef.Options, should handle them together.
specifiedCollates := extractCollateFromOption(colDef)

if err := setCharsetCollationFlenDecimal(colDef.Tp, specifiedCollates, tblCharset, dbCharset); err != nil {
return nil, nil, errors.Trace(err)
}
col, cts, err := columnDefToCol(ctx, offset, colDef, outPriKeyConstraint)
Expand Down Expand Up @@ -2516,7 +2553,11 @@ func (d *ddl) getModifiableColumnJob(ctx sessionctx.Context, ident ast.Ident, or
newCol.FieldType.Charset = col.FieldType.Charset
newCol.FieldType.Collate = col.FieldType.Collate
}
err = setCharsetCollationFlenDecimal(&newCol.FieldType, t.Meta().Charset, schema.Charset)
// specifiedCollates refers to collates in colDef.Option. When setting charset and collate here we
// should take the collate in colDef.Option into consideration rather than handling it separately
specifiedCollates := extractCollateFromOption(specNewColumn)

err = setCharsetCollationFlenDecimal(&newCol.FieldType, specifiedCollates, t.Meta().Charset, schema.Charset)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -3266,3 +3307,19 @@ func buildPartitionInfo(meta *model.TableInfo, d *ddl, spec *ast.AlterTableSpec)
}
return part, nil
}

// extractCollateFromOption take collates(may multiple) in option into consideration
// when handle charset and collate of a column, rather than handling it separately.
func extractCollateFromOption(def *ast.ColumnDef) []string {
specifiedCollates := make([]string, 0, 0)
for i := 0; i < len(def.Options); i++ {
op := def.Options[i]
if op.Tp == ast.ColumnOptionCollate {
specifiedCollates = append(specifiedCollates, op.StrValue)
def.Options = append(def.Options[:i], def.Options[i+1:]...)
// maintain the correct index
i--
}
}
return specifiedCollates
}
67 changes: 67 additions & 0 deletions executor/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,36 @@ func (s *testSuite3) TestCreateTable(c *C) {
}
}

// test multiple collate specified in column when create.
tk.MustExec("drop table if exists test_multiple_column_collate;")
tk.MustExec("create table test_multiple_column_collate (a char(1) collate utf8_bin collate utf8_general_ci) charset utf8mb4 collate utf8mb4_bin")
t, err := domain.GetDomain(tk.Se).InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("test_multiple_column_collate"))
c.Assert(err, IsNil)
c.Assert(t.Cols()[0].Charset, Equals, "utf8")
c.Assert(t.Cols()[0].Collate, Equals, "utf8_general_ci")
c.Assert(t.Meta().Charset, Equals, "utf8mb4")
c.Assert(t.Meta().Collate, Equals, "utf8mb4_bin")

tk.MustExec("drop table if exists test_multiple_column_collate;")
tk.MustExec("create table test_multiple_column_collate (a char(1) charset utf8 collate utf8_bin collate utf8_general_ci) charset utf8mb4 collate utf8mb4_bin")
t, err = domain.GetDomain(tk.Se).InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("test_multiple_column_collate"))
c.Assert(err, IsNil)
c.Assert(t.Cols()[0].Charset, Equals, "utf8")
c.Assert(t.Cols()[0].Collate, Equals, "utf8_general_ci")
c.Assert(t.Meta().Charset, Equals, "utf8mb4")
c.Assert(t.Meta().Collate, Equals, "utf8mb4_bin")

// test Err case for multiple collate specified in column when create.
tk.MustExec("drop table if exists test_err_multiple_collate;")
_, err = tk.Exec("create table test_err_multiple_collate (a char(1) charset utf8mb4 collate utf8_unicode_ci collate utf8_general_ci) charset utf8mb4 collate utf8mb4_bin")
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, ddl.ErrCollationCharsetMismatch.GenWithStackByArgs("utf8_unicode_ci", "utf8mb4").Error())

tk.MustExec("drop table if exists test_err_multiple_collate;")
_, err = tk.Exec("create table test_err_multiple_collate (a char(1) collate utf8_unicode_ci collate utf8mb4_general_ci) charset utf8mb4 collate utf8mb4_bin")
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, ddl.ErrCollationCharsetMismatch.GenWithStackByArgs("utf8mb4_general_ci", "utf8").Error())

// table option is auto-increment
tk.MustExec("drop table if exists create_auto_increment_test;")
tk.MustExec("create table create_auto_increment_test (id int not null auto_increment, name varchar(255), primary key(id)) auto_increment = 999;")
Expand Down Expand Up @@ -322,6 +352,43 @@ func (s *testSuite3) TestAlterTableModifyColumn(c *C) {
_, err = tk.Exec("alter table alter_view modify column c2 text")
c.Assert(err.Error(), Equals, ddl.ErrWrongObject.GenWithStackByArgs("test", "alter_view", "BASE TABLE").Error())
tk.MustExec("drop view alter_view")

// test multiple collate modification in column.
tk.MustExec("drop table if exists modify_column_multiple_collate")
tk.MustExec("create table modify_column_multiple_collate (a char(1) collate utf8_bin collate utf8_general_ci) charset utf8mb4 collate utf8mb4_bin")
_, err = tk.Exec("alter table modify_column_multiple_collate modify column a char(1) collate utf8mb4_bin;")
c.Assert(err, IsNil)
t, err := domain.GetDomain(tk.Se).InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("modify_column_multiple_collate"))
c.Assert(err, IsNil)
c.Assert(t.Cols()[0].Charset, Equals, "utf8mb4")
c.Assert(t.Cols()[0].Collate, Equals, "utf8mb4_bin")
c.Assert(t.Meta().Charset, Equals, "utf8mb4")
c.Assert(t.Meta().Collate, Equals, "utf8mb4_bin")

tk.MustExec("drop table if exists modify_column_multiple_collate;")
tk.MustExec("create table modify_column_multiple_collate (a char(1) collate utf8_bin collate utf8_general_ci) charset utf8mb4 collate utf8mb4_bin")
_, err = tk.Exec("alter table modify_column_multiple_collate modify column a char(1) charset utf8mb4 collate utf8mb4_bin;")
c.Assert(err, IsNil)
t, err = domain.GetDomain(tk.Se).InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("modify_column_multiple_collate"))
c.Assert(err, IsNil)
c.Assert(t.Cols()[0].Charset, Equals, "utf8mb4")
c.Assert(t.Cols()[0].Collate, Equals, "utf8mb4_bin")
c.Assert(t.Meta().Charset, Equals, "utf8mb4")
c.Assert(t.Meta().Collate, Equals, "utf8mb4_bin")

// test Err case for multiple collate modification in column.
tk.MustExec("drop table if exists err_modify_multiple_collate;")
tk.MustExec("create table err_modify_multiple_collate (a char(1) collate utf8_bin collate utf8_general_ci) charset utf8mb4 collate utf8mb4_bin")
_, err = tk.Exec("alter table err_modify_multiple_collate modify column a char(1) charset utf8mb4 collate utf8_bin;")
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, ddl.ErrCollationCharsetMismatch.GenWithStackByArgs("utf8_bin", "utf8mb4").Error())

tk.MustExec("drop table if exists err_modify_multiple_collate;")
tk.MustExec("create table err_modify_multiple_collate (a char(1) collate utf8_bin collate utf8_general_ci) charset utf8mb4 collate utf8mb4_bin")
_, err = tk.Exec("alter table err_modify_multiple_collate modify column a char(1) collate utf8_bin collate utf8mb4_bin;")
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, ddl.ErrCollationCharsetMismatch.GenWithStackByArgs("utf8mb4_bin", "utf8").Error())

}

func (s *testSuite3) TestDefaultDBAfterDropCurDB(c *C) {
Expand Down