Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: fix alter table charset bug and compatibility (#9790) #10714

Merged
merged 5 commits into from
Jun 6, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 114 additions & 4 deletions ddl/db_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,116 @@ func (s *testIntegrationSuite) TestChangingTableCharset(c *C) {
rs.Close()
}
c.Assert(err, NotNil)

rs, err = tk.Exec("alter table t charset utf8mb4")
if rs != nil {
rs.Close()
}
c.Assert(err.Error(), Equals, "[ddl:210]unsupported modify charset from latin1 to utf8mb4")

rs, err = tk.Exec("alter table t charset utf8mb4 collate utf8mb4_bin")
c.Assert(err, NotNil)

rs, err = tk.Exec("alter table t charset ''")
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "[parser:1115]Unknown character set: ''")

rs, err = tk.Exec("alter table t collate ''")
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "[ddl:1273]Unknown collation: ''")

rs, err = tk.Exec("alter table t charset utf8mb4 collate '' collate utf8mb4_bin;")
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "[ddl:1273]Unknown collation: ''")

rs, err = tk.Exec("alter table t charset latin1 charset utf8 charset utf8mb4 collate utf8_bin;")
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "[ddl:1302]Conflicting declarations: 'CHARACTER SET latin1' and 'CHARACTER SET utf8'")

rs, err = tk.Exec("alter table t charset utf8 collate utf8mb4_bin;")
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "[ddl:1253]COLLATION 'utf8mb4_bin' is not valid for CHARACTER SET 'utf8'")

rs, err = tk.Exec("alter table t charset utf8 collate utf8_bin collate utf8mb4_bin collate utf8_bin;")
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "[ddl:1253]COLLATION 'utf8mb4_bin' is not valid for CHARACTER SET 'utf8'")

// Test change column charset when changing table charset.
tk.MustExec("drop table t;")
tk.MustExec("create table t(a varchar(10)) charset utf8")
tk.MustExec("alter table t convert to charset utf8mb4;")
checkCharset := func() {
tbl := testGetTableByName(c, tk.Se, "test", "t")
c.Assert(tbl, NotNil)
c.Assert(tbl.Meta().Charset, Equals, charset.CharsetUTF8MB4)
c.Assert(tbl.Meta().Collate, Equals, charset.CollationUTF8MB4)
for _, col := range tbl.Meta().Columns {
c.Assert(col.Charset, Equals, charset.CharsetUTF8MB4)
c.Assert(col.Collate, Equals, charset.CollationUTF8MB4)
}
}
checkCharset()

// Test when column charset can not convert to the target charset.
tk.MustExec("drop table t;")
tk.MustExec("create table t(a varchar(10) character set ascii) charset utf8mb4")
_, err = tk.Exec("alter table t convert to charset utf8mb4;")
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "[ddl:210]unsupported modify charset from ascii to utf8mb4")

// Test when table charset is equal to target charset but column charset is not equal.
tk.MustExec("drop table t;")
tk.MustExec("create table t(a varchar(10) character set utf8) charset utf8mb4")
tk.MustExec("alter table t convert to charset utf8mb4;")
checkCharset()

// Mock table info with charset is "". Old TiDB maybe create table with charset is "".
db, ok := domain.GetDomain(tk.Se).InfoSchema().SchemaByName(model.NewCIStr("test"))
c.Assert(ok, IsTrue)
tbl := testGetTableByName(c, tk.Se, "test", "t")
tblInfo := tbl.Meta().Clone()
tblInfo.Charset = ""
tblInfo.Collate = ""
updateTableInfo := func(tblInfo *model.TableInfo) {
mockCtx := mock.NewContext()
mockCtx.Store = s.store
err = mockCtx.NewTxn()
c.Assert(err, IsNil)
txn, err := mockCtx.Txn(true)
c.Assert(err, IsNil)
mt := meta.NewMeta(txn)

err = mt.UpdateTable(db.ID, tblInfo)
c.Assert(err, IsNil)
err = txn.Commit(context.Background())
c.Assert(err, IsNil)
}
updateTableInfo(tblInfo)

// check table charset is ""
tk.MustExec("alter table t add column b varchar(10);") // load latest schema.
tbl = testGetTableByName(c, tk.Se, "test", "t")
c.Assert(tbl, NotNil)
c.Assert(tbl.Meta().Charset, Equals, "")
c.Assert(tbl.Meta().Collate, Equals, "")
// Test when table charset is "", this for compatibility.
tk.MustExec("alter table t convert to charset utf8mb4;")
checkCharset()

// Test when column charset is "".
tbl = testGetTableByName(c, tk.Se, "test", "t")
tblInfo = tbl.Meta().Clone()
tblInfo.Columns[0].Charset = ""
tblInfo.Columns[0].Collate = ""
updateTableInfo(tblInfo)
// check table charset is ""
tk.MustExec("alter table t drop column b;") // load latest schema.
tbl = testGetTableByName(c, tk.Se, "test", "t")
c.Assert(tbl, NotNil)
c.Assert(tbl.Meta().Columns[0].Charset, Equals, "")
c.Assert(tbl.Meta().Columns[0].Collate, Equals, "")
tk.MustExec("alter table t convert to charset utf8mb4;")
checkCharset()
}

func (s *testIntegrationSuite) TestCaseInsensitiveCharsetAndCollate(c *C) {
Expand Down Expand Up @@ -513,15 +623,15 @@ func (s *testIntegrationSuite) TestIgnoreColumnUTF8Charset(c *C) {

// Test for alter table convert charset
config.GetGlobalConfig().TreatOldVersionUTF8AsUTF8MB4 = true
tk.MustExec("alter table t change column b b varchar(40) character set ascii") // reload schema.
tk.MustExec("alter table t drop column b") // reload schema.
tk.MustExec("alter table t convert to charset utf8mb4;")

config.GetGlobalConfig().TreatOldVersionUTF8AsUTF8MB4 = false
tk.MustExec("alter table t change column b b varchar(50) CHARSET ascii") // reload schema.
tk.MustExec("alter table t add column b varchar(50);") // reload schema.
// TODO: fix this after PR 9790.
tk.MustQuery("show create table t").Check(testkit.Rows("t CREATE TABLE `t` (\n" +
" `a` varchar(20) CHARSET utf8 COLLATE utf8_bin DEFAULT NULL,\n" +
" `b` varchar(50) CHARSET ascii COLLATE ascii_bin DEFAULT NULL\n" +
" `a` varchar(20) DEFAULT NULL,\n" +
" `b` varchar(50) DEFAULT NULL\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin"))
}

Expand Down
9 changes: 9 additions & 0 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,10 @@ var (
ErrWrongNameForIndex = terror.ClassDDL.New(codeWrongNameForIndex, mysql.MySQLErrName[mysql.ErrWrongNameForIndex])
// ErrUnknownCharacterSet returns unknown character set.
ErrUnknownCharacterSet = terror.ClassDDL.New(codeUnknownCharacterSet, "Unknown character set: '%s'")
// ErrUnknownCollation returns unknown collation.
ErrUnknownCollation = terror.ClassDDL.New(codeUnknownCollation, "Unknown collation: '%s'")
// ErrCollationCharsetMismatch returns when collation not match the charset.
ErrCollationCharsetMismatch = terror.ClassDDL.New(codeCollationCharsetMismatch, mysql.MySQLErrName[mysql.ErrCollationCharsetMismatch])
// ErrConflictingDeclarations return conflict declarations.
ErrConflictingDeclarations = terror.ClassDDL.New(codeConflictingDeclarations, "Conflicting declarations: 'CHARACTER SET %s' and 'CHARACTER SET %s'")
// ErrPrimaryCantHaveNull returns All parts of a PRIMARY KEY must be NOT NULL; if you need NULL in a key, use UNIQUE instead
Expand Down Expand Up @@ -621,6 +625,8 @@ const (
codeWrongNameForIndex = terror.ErrCode(mysql.ErrWrongNameForIndex)
codeErrTooLongIndexComment = terror.ErrCode(mysql.ErrTooLongIndexComment)
codeUnknownCharacterSet = terror.ErrCode(mysql.ErrUnknownCharacterSet)
codeUnknownCollation = terror.ErrCode(mysql.ErrUnknownCollation)
codeCollationCharsetMismatch = terror.ErrCode(mysql.ErrCollationCharsetMismatch)
codeConflictingDeclarations = terror.ErrCode(mysql.ErrConflictingDeclarations)
codeCantCreateTable = terror.ErrCode(mysql.ErrCantCreateTable)
codeTableMustHaveColumns = terror.ErrCode(mysql.ErrTableMustHaveColumns)
Expand Down Expand Up @@ -690,6 +696,9 @@ func init() {
codePrimaryCantHaveNull: mysql.ErrPrimaryCantHaveNull,
codeWrongExprInPartitionFunc: mysql.ErrWrongExprInPartitionFunc,
codeUnknownPartition: mysql.ErrUnknownPartition,
codeUnknownCollation: mysql.ErrUnknownCollation,
codeCollationCharsetMismatch: mysql.ErrCollationCharsetMismatch,
codeConflictingDeclarations: mysql.ErrConflictingDeclarations,
}
terror.ErrClassToMySQLCodes[terror.ClassDDL] = ddlMySQLErrCodes
}
131 changes: 102 additions & 29 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,18 +275,27 @@ func ResolveCharsetCollation(tblCharset, dbCharset string) (string, string, erro
return charset, collate, nil
}

func typesNeedCharset(tp byte) bool {
switch tp {
case mysql.TypeString, mysql.TypeVarchar, mysql.TypeVarString,
mysql.TypeBlob, mysql.TypeTinyBlob, mysql.TypeMediumBlob, mysql.TypeLongBlob,
mysql.TypeEnum, mysql.TypeSet:
return true
}
return false
}

func setCharsetCollationFlenDecimal(tp *types.FieldType, tblCharset string, dbCharset string) error {
tp.Charset = strings.ToLower(tp.Charset)
tp.Collate = strings.ToLower(tp.Collate)
if len(tp.Charset) == 0 {
switch tp.Tp {
case mysql.TypeString, mysql.TypeVarchar, mysql.TypeVarString, mysql.TypeBlob, mysql.TypeTinyBlob, mysql.TypeMediumBlob, mysql.TypeLongBlob, mysql.TypeEnum, mysql.TypeSet:
if typesNeedCharset(tp.Tp) {
var err error
tp.Charset, tp.Collate, err = ResolveCharsetCollation(tblCharset, dbCharset)
if err != nil {
return errors.Trace(err)
}
default:
} else {
tp.Charset = charset.CharsetBin
tp.Collate = charset.CharsetBin
}
Expand Down Expand Up @@ -1263,9 +1272,9 @@ func isIgnorableSpec(tp ast.AlterTableType) bool {
// getCharsetAndCollateInTableOption will iterate the charset and collate in the options,
// and returns the last charset and collate in options. If there is no charset in the options,
// the returns charset will be "", the same as collate.
func getCharsetAndCollateInTableOption(startIdx int, options []*ast.TableOption) (charset, collate string) {
charsets := make([]string, len(options))
collates := make([]string, len(options))
func getCharsetAndCollateInTableOption(startIdx int, options []*ast.TableOption) (ca, co string, err error) {
charsets := make([]string, 0, len(options))
collates := make([]string, 0, len(options))
for i := startIdx; i < len(options); i++ {
opt := options[i]
// we set the charset to the last option. example: alter table t charset latin1 charset utf8 collate utf8_bin;
Expand All @@ -1278,12 +1287,26 @@ func getCharsetAndCollateInTableOption(startIdx int, options []*ast.TableOption)
}
}

if len(charsets) != 0 {
charset = charsets[len(charsets)-1]
if len(charsets) > 1 {
return "", "", ErrConflictingDeclarations.GenWithStackByArgs(charsets[0], charsets[1])
}
if len(charsets) == 1 {
if charsets[0] == "" {
return "", "", ErrUnknownCharacterSet.GenWithStackByArgs("")
}
ca = charsets[0]
}

if len(collates) != 0 {
collate = collates[len(collates)-1]
for i := range collates {
if collates[i] == "" {
return "", "", ErrUnknownCollation.GenWithStackByArgs("")
}
if len(ca) != 0 && !charset.ValidCharsetAndCollation(ca, collates[i]) {
return "", "", ErrCollationCharsetMismatch.GenWithStackByArgs(collates[i], ca)
}
}
co = collates[len(collates)-1]
}
return
}
Expand Down Expand Up @@ -1371,7 +1394,11 @@ func (d *ddl) AlterTable(ctx sessionctx.Context, ident ast.Ident, specs []*ast.A
if handledCharsetOrCollate {
continue
}
toCharset, toCollate := getCharsetAndCollateInTableOption(i, spec.Options)
var toCharset, toCollate string
toCharset, toCollate, err = getCharsetAndCollateInTableOption(i, spec.Options)
if err != nil {
return err
}
err = d.AlterTableCharsetAndCollate(ctx, ident, toCharset, toCollate)
handledCharsetOrCollate = true
}
Expand Down Expand Up @@ -1718,7 +1745,7 @@ func (d *ddl) DropColumn(ctx sessionctx.Context, ti ast.Ident, colName model.CIS
// modifiableCharsetAndCollation returns error when the charset or collation is not modifiable.
func modifiableCharsetAndCollation(toCharset, toCollate, origCharset, origCollate string) error {
if !charset.ValidCharsetAndCollation(toCharset, toCollate) {
return ErrUnknownCharacterSet.GenWithStackByArgs(toCharset, toCollate)
return ErrUnknownCharacterSet.GenWithStack("Unknown character set: '%s', collation: '%s'", toCharset, toCollate)
}
if toCharset == charset.CharsetUTF8MB4 && origCharset == charset.CharsetUTF8 {
// TiDB only allow utf8 to be changed to utf8mb4.
Expand Down Expand Up @@ -2152,11 +2179,9 @@ func (d *ddl) AlterTableCharsetAndCollate(ctx sessionctx.Context, ident ast.Iden
return errors.Trace(infoschema.ErrTableNotExists.GenWithStackByArgs(ident.Schema, ident.Name))
}

origCharset := tb.Meta().Charset
origCollate := tb.Meta().Collate
if toCharset == "" {
// charset does not change.
toCharset = origCharset
toCharset = tb.Meta().Charset
}

if toCollate == "" {
Expand All @@ -2166,23 +2191,13 @@ func (d *ddl) AlterTableCharsetAndCollate(ctx sessionctx.Context, ident ast.Iden
return errors.Trace(err)
}
}
// Old version schema charset maybe modified when load schema if TreatOldVersionUTF8AsUTF8MB4 was enable.
// So even if the origCharset equal toCharset, we still need to do the ddl for old version schema.
if origCharset == toCharset && origCollate == toCollate && tb.Meta().Version >= model.TableInfoVersion2 {
// nothing to do.
return nil
}

if err = modifiableCharsetAndCollation(toCharset, toCollate, origCharset, origCollate); err != nil {
return errors.Trace(err)
doNothing, err := checkAlterTableCharset(tb.Meta(), schema, toCharset, toCollate)
if err != nil {
return err
}

for _, col := range tb.Meta().Cols() {
if col.Tp == mysql.TypeVarchar {
if err = IsTooBigFieldLength(col.Flen, col.Name.O, toCharset); err != nil {
return errors.Trace(err)
}
}
if doNothing {
return nil
}

job := &model.Job{
Expand All @@ -2197,6 +2212,64 @@ func (d *ddl) AlterTableCharsetAndCollate(ctx sessionctx.Context, ident ast.Iden
return errors.Trace(err)
}

// checkAlterTableCharset uses to check is it possible to change the charset of table.
// This function returns 2 variable:
// doNothing: if doNothing is true, means no need to change any more, because the target charset is same with the charset of table.
// err: if err is not nil, means it is not possible to change table charset to target charset.
func checkAlterTableCharset(tblInfo *model.TableInfo, dbInfo *model.DBInfo, toCharset, toCollate string) (doNothing bool, err error) {
origCharset := tblInfo.Charset
origCollate := tblInfo.Collate
// Old version schema charset maybe modified when load schema if TreatOldVersionUTF8AsUTF8MB4 was enable.
// So even if the origCharset equal toCharset, we still need to do the ddl for old version schema.
if origCharset == toCharset && origCollate == toCollate && tblInfo.Version >= model.TableInfoVersion2 {
// nothing to do.
doNothing = true
for _, col := range tblInfo.Columns {
if col.Charset == charset.CharsetBin {
continue
}
if col.Charset == toCharset && col.Collate == toCollate {
continue
}
doNothing = false
}
if doNothing {
return doNothing, nil
}
}

if len(origCharset) == 0 {
// The table charset may be "", if the table is create in old TiDB version, such as v2.0.8.
// This DDL will update the table charset to default charset.
origCharset, origCollate, err = ResolveCharsetCollation("", dbInfo.Charset)
if err != nil {
return doNothing, err
}
}

if err = modifiableCharsetAndCollation(toCharset, toCollate, origCharset, origCollate); err != nil {
return doNothing, err
}

for _, col := range tblInfo.Columns {
if col.Tp == mysql.TypeVarchar {
if err = IsTooBigFieldLength(col.Flen, col.Name.O, toCharset); err != nil {
return doNothing, err
}
}
if col.Charset == charset.CharsetBin {
continue
}
if len(col.Charset) == 0 {
continue
}
if err = modifiableCharsetAndCollation(toCharset, toCollate, col.Charset, col.Collate); err != nil {
return doNothing, err
}
}
return doNothing, nil
}

// RenameIndex renames an index.
// In TiDB, indexes are case-insensitive (so index 'a' and 'A" are considered the same index),
// but index names are case-sensitive (we can rename index 'a' to 'A')
Expand Down
12 changes: 12 additions & 0 deletions ddl/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,18 @@ func onModifySchemaCharsetAndCollate(t *meta.Meta, job *model.Job) (ver int64, _
return ver, nil
}

func checkSchemaExistAndCancelNotExistJob(t *meta.Meta, job *model.Job) (*model.DBInfo, error) {
dbInfo, err := t.GetDatabase(job.SchemaID)
if err != nil {
return nil, errors.Trace(err)
}
if dbInfo == nil {
job.State = model.JobStateCancelled
return nil, infoschema.ErrDatabaseDropExists.GenWithStackByArgs("")
}
return dbInfo, nil
}

func getIDs(tables []*model.TableInfo) []int64 {
ids := make([]int64, 0, len(tables))
for _, t := range tables {
Expand Down
Loading