From d71b25ddcb15b2d7d25c5e6942c1db427cd77e45 Mon Sep 17 00:00:00 2001 From: Xuecheng Zhang Date: Wed, 5 Aug 2020 17:48:58 +0800 Subject: [PATCH] feat: upgrade v1.0.x metadata tables (#858) --- _utils/terror_gen/errors_release.txt | 3 +- errors.toml | 8 +- pkg/binlog/reader/util.go | 136 +++++++++++ pkg/binlog/reader/util_test.go | 68 ++++++ pkg/terror/error_list.go | 8 +- pkg/v1dbschema/schema.go | 214 +++++++++++++++++ pkg/v1dbschema/schema_test.go | 217 ++++++++++++++++++ .../v106_syncer_checkpoint-schema.sql | 11 + .../v106_syncer_checkpoint.sql | 3 + .../v106_syncer_onlineddl-schema.sql | 8 + .../v106_syncer_onlineddl.sql | 3 + 11 files changed, 676 insertions(+), 3 deletions(-) create mode 100644 pkg/binlog/reader/util.go create mode 100644 pkg/binlog/reader/util_test.go create mode 100644 pkg/v1dbschema/schema.go create mode 100644 pkg/v1dbschema/schema_test.go create mode 100644 pkg/v1dbschema/v106_data_for_test/v106_syncer_checkpoint-schema.sql create mode 100644 pkg/v1dbschema/v106_data_for_test/v106_syncer_checkpoint.sql create mode 100644 pkg/v1dbschema/v106_data_for_test/v106_syncer_onlineddl-schema.sql create mode 100644 pkg/v1dbschema/v106_data_for_test/v106_syncer_onlineddl.sql diff --git a/_utils/terror_gen/errors_release.txt b/_utils/terror_gen/errors_release.txt index 079d7a6749..f33a27fa53 100644 --- a/_utils/terror_gen/errors_release.txt +++ b/_utils/terror_gen/errors_release.txt @@ -118,7 +118,8 @@ ErrShardDDLOptimismTrySyncFail,[code=11111:class=functional:scope=internal:level ErrConnInvalidTLSConfig,[code=11112:class=functional:scope=internal:level=medium], "Message: invalid TLS config, Workaround: Please check the `ssl-ca`, `ssl-cert` and `ssl-key` config." ErrConnRegistryTLSConfig,[code=11113:class=functional:scope=internal:level=medium], "Message: fail to registry TLS config" ErrUpgradeVersionEtcdFail,[code=11114:class=functional:scope=internal:level=high], "Message: fail to operate DM cluster version in etcd, Workaround: Please use `list-member --master` to confirm whether the DM-master cluster is healthy" -ErrInvalidV1WorkerMetaPath,[code=11115:class=functional:scope=internal:level=medium], "Message: %s is an invalid v1.0.x DM-worker meta path, Workaround: Please check no `meta-dir` set for v1.0.x DM-worker" +ErrInvalidV1WorkerMetaPath,[code=11115:class=functional:scope=internal:level=medium], "Message: %s is an invalid v1.0.x DM-worker meta path, Workaround: Please check no `meta-dir` set for v1.0.x DM-worker." +ErrFailUpdateV1DBSchema,[code=11116:class=functional:scope=internal:level=medium], "Message: fail to upgrade v1.0.x DB schema, Workaround: Please confirm that you have not violated any restrictions in the upgrade documentation." ErrConfigCheckItemNotSupport,[code=20001:class=config:scope=internal:level=medium], "Message: checking item %s is not supported\n%s, Workaround: Please check `ignore-checking-items` config in task configuration file, which can be set including `all`/`dump_privilege`/`replication_privilege`/`version`/`binlog_enable`/`binlog_format`/`binlog_row_image`/`table_schema`/`schema_of_shard_tables`/`auto_increment_ID`." ErrConfigTomlTransform,[code=20002:class=config:scope=internal:level=medium], "Message: %s, Workaround: Please check the configuration file has correct TOML format." ErrConfigYamlTransform,[code=20003:class=config:scope=internal:level=medium], "Message: %s, Workaround: Please check the configuration file has correct YAML format." diff --git a/errors.toml b/errors.toml index e7a50c345e..61e74b94f4 100644 --- a/errors.toml +++ b/errors.toml @@ -721,7 +721,13 @@ tags = ["internal", "high"] [error.DM-functional-11115] message = "%s is an invalid v1.0.x DM-worker meta path" description = "" -workaround = "Please check no `meta-dir` set for v1.0.x DM-worker" +workaround = "Please check no `meta-dir` set for v1.0.x DM-worker." +tags = ["internal", "medium"] + +[error.DM-functional-11116] +message = "fail to upgrade v1.0.x DB schema" +description = "" +workaround = "Please confirm that you have not violated any restrictions in the upgrade documentation." tags = ["internal", "medium"] [error.DM-config-20001] diff --git a/pkg/binlog/reader/util.go b/pkg/binlog/reader/util.go new file mode 100644 index 0000000000..d0b8fc9a34 --- /dev/null +++ b/pkg/binlog/reader/util.go @@ -0,0 +1,136 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package reader + +import ( + "context" + "fmt" + + "github.com/pingcap/errors" + "github.com/pingcap/parser" + uuid "github.com/satori/go.uuid" + gmysql "github.com/siddontang/go-mysql/mysql" + "github.com/siddontang/go-mysql/replication" + + "github.com/pingcap/dm/pkg/binlog/event" + "github.com/pingcap/dm/pkg/gtid" + "github.com/pingcap/dm/pkg/terror" + "github.com/pingcap/dm/relay/common" +) + +// GetGTIDsForPos tries to get GTID sets for the specified binlog position (for the corresponding txn). +// NOTE: this method is very similar with `relay/writer/file_util.go/getTxnPosGTIDs`, unify them if needed later. +// NOTE: this method is not well tested directly, but more tests have already been done for `relay/writer/file_util.go/getTxnPosGTIDs`. +func GetGTIDsForPos(ctx context.Context, r Reader, endPos gmysql.Position, parser2 *parser.Parser) (gtid.Set, error) { + // start to get and parse binlog event from the beginning of the file. + startPos := gmysql.Position{ + Name: endPos.Name, + Pos: 0, + } + err := r.StartSyncByPos(startPos) + if err != nil { + return nil, err + } + defer r.Close() + + var ( + flavor string + latestPos uint32 + latestGSet gmysql.GTIDSet + nextGTIDStr string // can be recorded if the coming transaction completed + ) + for { + var e *replication.BinlogEvent + e, err = r.GetEvent(ctx) + if err != nil { + return nil, err + } + + // NOTE: only update endPos/GTIDs for DDL/XID to get an complete transaction. + switch ev := e.Event.(type) { + case *replication.QueryEvent: + isDDL := common.CheckIsDDL(string(ev.Query), parser2) + if isDDL { + if latestGSet == nil { + // GTID not enabled, can't get GTIDs for the position. + return nil, errors.Errorf("should have a GTIDEvent before the DDL QueryEvent %+v", e.Header) + } + err = latestGSet.Update(nextGTIDStr) + if err != nil { + return nil, terror.Annotatef(err, "update GTID set %v with GTID %s", latestGSet, nextGTIDStr) + } + latestPos = e.Header.LogPos + } + case *replication.XIDEvent: + if latestGSet == nil { + // GTID not enabled, can't get GTIDs for the position. + return nil, errors.Errorf("should have a GTIDEvent before the XIDEvent %+v", e.Header) + } + err = latestGSet.Update(nextGTIDStr) + if err != nil { + return nil, terror.Annotatef(err, "update GTID set %v with GTID %s", latestGSet, nextGTIDStr) + } + latestPos = e.Header.LogPos + case *replication.GTIDEvent: + if latestGSet == nil { + return nil, errors.Errorf("should have a PreviousGTIDsEvent before the GTIDEvent %+v", e.Header) + } + // learn from: https://github.com/siddontang/go-mysql/blob/c6ab05a85eb86dc51a27ceed6d2f366a32874a24/replication/binlogsyncer.go#L736 + u, _ := uuid.FromBytes(ev.SID) + nextGTIDStr = fmt.Sprintf("%s:%d", u.String(), ev.GNO) + case *replication.MariadbGTIDEvent: + if latestGSet == nil { + return nil, errors.Errorf("should have a MariadbGTIDListEvent before the MariadbGTIDEvent %+v", e.Header) + } + // learn from: https://github.com/siddontang/go-mysql/blob/c6ab05a85eb86dc51a27ceed6d2f366a32874a24/replication/binlogsyncer.go#L745 + GTID := ev.GTID + nextGTIDStr = fmt.Sprintf("%d-%d-%d", GTID.DomainID, GTID.ServerID, GTID.SequenceNumber) + case *replication.PreviousGTIDsEvent: + // if GTID enabled, we can get a PreviousGTIDEvent after the FormatDescriptionEvent + // ref: https://github.com/mysql/mysql-server/blob/8cc757da3d87bf4a1f07dcfb2d3c96fed3806870/sql/binlog.cc#L4549 + // ref: https://github.com/mysql/mysql-server/blob/8cc757da3d87bf4a1f07dcfb2d3c96fed3806870/sql/binlog.cc#L5161 + var gSet gtid.Set + gSet, err = gtid.ParserGTID(gmysql.MySQLFlavor, ev.GTIDSets) + if err != nil { + return nil, err + } + latestGSet = gSet.Origin() + flavor = gmysql.MySQLFlavor + case *replication.MariadbGTIDListEvent: + // a MariadbGTIDListEvent logged in every binlog to record the current replication state if GTID enabled + // ref: https://mariadb.com/kb/en/library/gtid_list_event/ + gSet, err2 := event.GTIDsFromMariaDBGTIDListEvent(e) + if err2 != nil { + return nil, terror.Annotatef(err2, "get GTID set from MariadbGTIDListEvent %+v", e.Header) + } + latestGSet = gSet.Origin() + flavor = gmysql.MariaDBFlavor + } + + if latestPos == endPos.Pos { + // reach the end position, return the GTID sets. + if latestGSet == nil { + return nil, errors.Errorf("no GTIDs get for position %s", endPos) + } + var latestGTIDs gtid.Set + latestGTIDs, err = gtid.ParserGTID(flavor, latestGSet.String()) + if err != nil { + return nil, terror.Annotatef(err, "parse GTID set %s with flavor %s", latestGSet.String(), flavor) + } + return latestGTIDs, nil + } else if latestPos > endPos.Pos { + return nil, errors.Errorf("invalid position %s or GTID not enabled in upstream", endPos) + } + } +} diff --git a/pkg/binlog/reader/util_test.go b/pkg/binlog/reader/util_test.go new file mode 100644 index 0000000000..1412bdfaa8 --- /dev/null +++ b/pkg/binlog/reader/util_test.go @@ -0,0 +1,68 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package reader + +import ( + "context" + "time" + + . "github.com/pingcap/check" + gmysql "github.com/siddontang/go-mysql/mysql" + "github.com/siddontang/go-mysql/replication" + + "github.com/pingcap/dm/pkg/utils" +) + +// added to testTCPReaderSuite to re-use DB connection. +func (t *testTCPReaderSuite) TestGetGTIDsForPos(c *C) { + var ( + cfg = replication.BinlogSyncerConfig{ + ServerID: serverIDs[1], + Flavor: flavor, + Host: t.host, + Port: uint16(t.port), + User: t.user, + Password: t.password, + UseDecimal: true, + VerifyChecksum: true, + } + ctx, cancel = context.WithTimeout(context.Background(), 30*time.Second) + ) + defer cancel() + + endPos, endGS, err := utils.GetMasterStatus(t.db, flavor) + c.Assert(err, IsNil) + + parser2, err := utils.GetParser(t.db, false) + c.Assert(err, IsNil) + + r1 := NewTCPReader(cfg) + c.Assert(r1, NotNil) + defer r1.Close() + + gs, err := GetGTIDsForPos(ctx, r1, endPos, parser2) + c.Assert(err, IsNil) + c.Assert(gs.Equal(endGS), IsTrue) + + // try to get for an invalid pos. + r2 := NewTCPReader(cfg) + c.Assert(r2, NotNil) + defer r2.Close() + gs, err = GetGTIDsForPos(ctx, r2, gmysql.Position{ + Name: endPos.Name, + Pos: endPos.Pos - 1, + }, parser2) + c.Assert(err, ErrorMatches, ".*invalid position .* or GTID not enabled in upstream.*") + c.Assert(gs, IsNil) +} diff --git a/pkg/terror/error_list.go b/pkg/terror/error_list.go index 112f4cabc3..ed6a321b95 100644 --- a/pkg/terror/error_list.go +++ b/pkg/terror/error_list.go @@ -162,6 +162,9 @@ const ( // pkg/v1workermeta codeInvalidV1WorkerMetaPath + + // pkg/v1dbschema + codeFailUpdateV1DBSchema ) // Config related error code list @@ -728,7 +731,10 @@ var ( ErrUpgradeVersionEtcdFail = New(codeUpgradeVersionEtcdFail, ClassFunctional, ScopeInternal, LevelHigh, "fail to operate DM cluster version in etcd", "Please use `list-member --master` to confirm whether the DM-master cluster is healthy") // pkg/v1workermeta - ErrInvalidV1WorkerMetaPath = New(codeInvalidV1WorkerMetaPath, ClassFunctional, ScopeInternal, LevelMedium, "%s is an invalid v1.0.x DM-worker meta path", "Please check no `meta-dir` set for v1.0.x DM-worker") + ErrInvalidV1WorkerMetaPath = New(codeInvalidV1WorkerMetaPath, ClassFunctional, ScopeInternal, LevelMedium, "%s is an invalid v1.0.x DM-worker meta path", "Please check no `meta-dir` set for v1.0.x DM-worker.") + + // pkg/v1dbschema + ErrFailUpdateV1DBSchema = New(codeFailUpdateV1DBSchema, ClassFunctional, ScopeInternal, LevelMedium, "fail to upgrade v1.0.x DB schema", "Please confirm that you have not violated any restrictions in the upgrade documentation.") // Config related error ErrConfigCheckItemNotSupport = New(codeConfigCheckItemNotSupport, ClassConfig, ScopeInternal, LevelMedium, "checking item %s is not supported\n%s", "Please check `ignore-checking-items` config in task configuration file, which can be set including `all`/`dump_privilege`/`replication_privilege`/`version`/`binlog_enable`/`binlog_format`/`binlog_row_image`/`table_schema`/`schema_of_shard_tables`/`auto_increment_ID`.") diff --git a/pkg/v1dbschema/schema.go b/pkg/v1dbschema/schema.go new file mode 100644 index 0000000000..533183ef72 --- /dev/null +++ b/pkg/v1dbschema/schema.go @@ -0,0 +1,214 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package v1dbschema + +import ( + "fmt" + "strconv" + + "github.com/go-sql-driver/mysql" + "github.com/pingcap/errors" + "github.com/pingcap/failpoint" + "github.com/pingcap/parser" + "github.com/pingcap/tidb-tools/pkg/dbutil" + "github.com/pingcap/tidb/errno" + gmysql "github.com/siddontang/go-mysql/mysql" + "github.com/siddontang/go-mysql/replication" + "go.uber.org/zap" + + "github.com/pingcap/dm/dm/config" + "github.com/pingcap/dm/pkg/binlog" + "github.com/pingcap/dm/pkg/binlog/reader" + "github.com/pingcap/dm/pkg/conn" + tcontext "github.com/pingcap/dm/pkg/context" + "github.com/pingcap/dm/pkg/cputil" + "github.com/pingcap/dm/pkg/gtid" + "github.com/pingcap/dm/pkg/terror" + "github.com/pingcap/dm/pkg/utils" +) + +// UpdateSchema updates the DB schema from v1.0.x to v2.0.x, including: +// - update checkpoint. +// - update online DDL meta. +func UpdateSchema(tctx *tcontext.Context, db *conn.BaseDB, cfg *config.SubTaskConfig) error { + // get db connection. + dbConn, err := db.GetBaseConn(tctx.Context()) + if err != nil { + return terror.ErrFailUpdateV1DBSchema.Delegate(err) + } + defer db.CloseBaseConn(dbConn) + + // setup SQL parser. + parser2, err := utils.GetParser(db.DB, cfg.EnableANSIQuotes) + if err != nil { + return terror.ErrFailUpdateV1DBSchema.Delegate(err) + } + + // setup a TCP binlog reader (because no relay can be used when upgrading). + syncCfg := replication.BinlogSyncerConfig{ + ServerID: cfg.ServerID, + Flavor: cfg.Flavor, + Host: cfg.From.Host, + Port: uint16(cfg.From.Port), + User: cfg.From.User, + Password: cfg.From.Password, // plaintext. + UseDecimal: true, + VerifyChecksum: true, + } + tcpReader := reader.NewTCPReader(syncCfg) + + // update checkpoint. + err = updateSyncerCheckpoint(tctx, dbConn, cfg.Name, dbutil.TableName(cfg.MetaSchema, cputil.SyncerCheckpoint(cfg.Name)), cfg.SourceID, cfg.EnableGTID, tcpReader, parser2) + if err != nil { + return terror.ErrFailUpdateV1DBSchema.Delegate(err) + } + + // update online DDL meta. + err = updateSyncerOnlineDDLMeta(tctx, dbConn, cfg.Name, dbutil.TableName(cfg.MetaSchema, cputil.SyncerOnlineDDL(cfg.Name)), cfg.SourceID, cfg.ServerID) + return terror.ErrFailUpdateV1DBSchema.Delegate(err) +} + +// updateSyncerCheckpoint updates the checkpoint table of sync unit, including: +// - update table schema: +// - add column `binlog_gtid VARCHAR(256)`. +// - add column `table_info JSON NOT NULL`. +// - update column value: +// - fill `binlog_gtid` based on `binlog_name` and `binlog_pos` if GTID mode enable. +// NOTE: no need to update the value of `table_info` because DM can get schema automatically from downstream when replicating DML. +func updateSyncerCheckpoint(tctx *tcontext.Context, dbConn *conn.BaseConn, taskName, tableName, sourceID string, fillGTIDs bool, tcpReader reader.Reader, parser2 *parser.Parser) error { + var gs gtid.Set + if fillGTIDs { + // NOTE: get GTID sets for all (global & tables) binlog position has many problems, at least including: + // - it is a heavy work because it should read binlog events once for each position + // - some binlog file for the position may have already been purge + // so we only get GTID sets for the global position now, + // and this should only have side effects for in-syncing shard tables, but we can mention and warn this case in the user docs. + pos, err := getGlobalPos(tctx, dbConn, tableName, sourceID) + if err != nil { + return terror.Annotatef(err, "get global checkpoint position for source %s", sourceID) + } + if pos.Name != "" { + gs, err = getGTIDsForPos(tctx, pos, tcpReader, parser2) + if err != nil { + return terror.Annotatef(err, "get GTID sets for position %s", pos) + } + } + } + + // try to add columns. + // NOTE: ignore already exists error to continue the process. + queries := []string{ + fmt.Sprintf(`ALTER TABLE %s ADD COLUMN binlog_gtid VARCHAR(256) AFTER binlog_pos`, tableName), + fmt.Sprintf(`ALTER TABLE %s ADD COLUMN table_info JSON NOT NULL AFTER binlog_gtid`, tableName), + } + _, err := dbConn.ExecuteSQLWithIgnoreError(tctx, nil, taskName, ignoreError, queries) + if err != nil { + return terror.Annotatef(err, "add columns for checkpoint table") + } + + if fillGTIDs && gs != nil { + // set binlog_gtid, `gs` should valid here. + err = setGlobalGTIDs(tctx, dbConn, taskName, tableName, sourceID, gs.String()) + if err != nil { + return terror.Annotatef(err, "set GTID sets %s for checkpoint table", gs.String()) + } + } + return nil +} + +// updateSyncerOnlineDDLMeta updates the online DDL meta data, including: +// - update the value of `id` from `server-id` to `source-id`. +func updateSyncerOnlineDDLMeta(tctx *tcontext.Context, dbConn *conn.BaseConn, taskName, tableName, sourceID string, serverID uint32) error { + queries := []string{ + fmt.Sprintf(`UPDATE %s SET id=? WHERE id=?`, tableName), // for multiple columns. + } + args := []interface{}{sourceID, strconv.FormatUint(uint64(serverID), 10)} + _, err := dbConn.ExecuteSQL(tctx, nil, taskName, queries, args) + return terror.Annotatef(err, "update id column for online DDL meta table") +} + +// getGlobalPos tries to get the global checkpoint position. +func getGlobalPos(tctx *tcontext.Context, dbConn *conn.BaseConn, tableName, sourceID string) (gmysql.Position, error) { + query := fmt.Sprintf(`SELECT binlog_name, binlog_pos FROM %s WHERE id=? AND is_global=? LIMIT 1`, tableName) + args := []interface{}{sourceID, true} + rows, err := dbConn.QuerySQL(tctx, query, args...) + if err != nil { + return gmysql.Position{}, err + } + defer rows.Close() + if !rows.Next() { + return gmysql.Position{}, nil // no global checkpoint position exists. + } + + var ( + name string + pos uint32 + ) + err = rows.Scan(&name, &pos) + if err != nil { + return gmysql.Position{}, err + } + + return gmysql.Position{ + Name: name, + Pos: pos, + }, rows.Err() +} + +// getGTIDsForPos gets the GTID sets for the position. +func getGTIDsForPos(tctx *tcontext.Context, pos gmysql.Position, tcpReader reader.Reader, parser2 *parser.Parser) (gtid.Set, error) { + // NOTE: because we have multiple unit test cases updating/clearing binlog in the upstream, + // we may encounter errors when reading binlog event but cleared by another test case. + failpoint.Inject("MockGetGTIDsForPos", func(val failpoint.Value) { + str := val.(string) + gs, _ := gtid.ParserGTID(gmysql.MySQLFlavor, str) + tctx.L().Info("set gs for position", zap.String("failpoint", "MockGetGTIDsForPos"), zap.Stringer("pos", pos)) + failpoint.Return(gs, nil) + }) + + realPos, err := binlog.RealMySQLPos(pos) + if err != nil { + return nil, err + } + gs, err := reader.GetGTIDsForPos(tctx.Ctx, tcpReader, realPos, parser2) + if err != nil { + return nil, err + } + return gs, nil +} + +// setGlobalGTIDs tries to set `binlog_gtid` for the global checkpoint. +func setGlobalGTIDs(tctx *tcontext.Context, dbConn *conn.BaseConn, taskName, tableName, sourceID, gs string) error { + queries := []string{ + fmt.Sprintf(`UPDATE %s SET binlog_gtid=? WHERE id=? AND is_global=? LIMIT 1`, tableName), + } + args := []interface{}{gs, sourceID, true} + _, err := dbConn.ExecuteSQL(tctx, nil, taskName, queries, args) + return err +} + +func ignoreError(err error) bool { + err = errors.Cause(err) // check the original error + mysqlErr, ok := err.(*mysql.MySQLError) + if !ok { + return false + } + + switch mysqlErr.Number { + case errno.ErrDupFieldName: + return true + default: + return false + } +} diff --git a/pkg/v1dbschema/schema_test.go b/pkg/v1dbschema/schema_test.go new file mode 100644 index 0000000000..c9e60bad7a --- /dev/null +++ b/pkg/v1dbschema/schema_test.go @@ -0,0 +1,217 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package v1dbschema + +import ( + "database/sql" + "fmt" + "io/ioutil" + "os" + "path/filepath" + "runtime" + "strconv" + "strings" + "testing" + + . "github.com/pingcap/check" + "github.com/pingcap/failpoint" + "github.com/pingcap/tidb-tools/pkg/dbutil" + gmysql "github.com/siddontang/go-mysql/mysql" + + "github.com/pingcap/dm/dm/config" + "github.com/pingcap/dm/pkg/conn" + tcontext "github.com/pingcap/dm/pkg/context" + "github.com/pingcap/dm/pkg/cputil" + "github.com/pingcap/dm/pkg/gtid" +) + +func TestSuite(t *testing.T) { + TestingT(t) +} + +type testSchema struct { + host string + port int + user string + password string + db *conn.BaseDB +} + +var _ = Suite(&testSchema{}) + +func (t *testSchema) SetUpSuite(c *C) { + t.setUpDBConn(c) +} + +func (t *testSchema) TestTearDown(c *C) { + t.db.Close() +} + +func (t *testSchema) setUpDBConn(c *C) { + t.host = os.Getenv("MYSQL_HOST") + if t.host == "" { + t.host = "127.0.0.1" + } + t.port, _ = strconv.Atoi(os.Getenv("MYSQL_PORT")) + if t.port == 0 { + t.port = 3306 + } + t.user = os.Getenv("MYSQL_USER") + if t.user == "" { + t.user = "root" + } + t.password = os.Getenv("MYSQL_PSWD") + + cfg := config.DBConfig{ + Host: t.host, + Port: t.port, + User: t.user, + Password: t.password, + Session: map[string]string{"sql_log_bin": "off"}, // do not enable binlog to break other unit test cases. + } + cfg.Adjust() + + var err error + t.db, err = conn.DefaultDBProvider.Apply(cfg) + c.Assert(err, IsNil) +} + +func (t *testSchema) TestSchemaV106ToV20x(c *C) { + var ( + _, currFile, _, _ = runtime.Caller(0) + v1DataDir = filepath.Join(filepath.Dir(currFile), "v106_data_for_test") + tctx = tcontext.Background() + + cfg = &config.SubTaskConfig{ + Name: "test", + SourceID: "mysql-replica-01", + ServerID: 429523137, + MetaSchema: "dm_meta_v106_test", + From: config.DBConfig{ + Host: t.host, + Port: t.port, + User: t.user, + Password: t.password, + }, + } + + endPos = gmysql.Position{ + Name: "mysql-bin|000001.000001", + Pos: 3574, + } + endGS, _ = gtid.ParserGTID(gmysql.MySQLFlavor, "ccb992ad-a557-11ea-ba6a-0242ac140002:1-16") + ) + + c.Assert(failpoint.Enable("github.com/pingcap/dm/pkg/v1dbschema/MockGetGTIDsForPos", `return("ccb992ad-a557-11ea-ba6a-0242ac140002:1-16")`), IsNil) + defer failpoint.Disable("github.com/pingcap/dm/pkg/v1dbschema/MockGetGTIDsForPos") + + dbConn, err := t.db.GetBaseConn(tctx.Ctx) + c.Assert(err, IsNil) + + defer func() { + _, err = dbConn.ExecuteSQL(tctx, nil, cfg.Name, []string{ + `DROP DATABASE ` + cfg.MetaSchema, + }) + }() + + // create metadata schema. + _, err = dbConn.ExecuteSQL(tctx, nil, cfg.Name, []string{ + `CREATE DATABASE IF NOT EXISTS ` + cfg.MetaSchema, + }) + c.Assert(err, IsNil) + + // create v1.0.6 checkpoint table. + createCpV106, err := ioutil.ReadFile(filepath.Join(v1DataDir, "v106_syncer_checkpoint-schema.sql")) + c.Assert(err, IsNil) + _, err = dbConn.ExecuteSQL(tctx, nil, cfg.Name, []string{ + string(createCpV106), + }) + c.Assert(err, IsNil) + + // create v1.0.6 online DDL metadata table. + createOnV106, err := ioutil.ReadFile(filepath.Join(v1DataDir, "v106_syncer_onlineddl-schema.sql")) + c.Assert(err, IsNil) + _, err = dbConn.ExecuteSQL(tctx, nil, cfg.Name, []string{ + string(createOnV106), + }) + c.Assert(err, IsNil) + + // update position. + insertCpV106, err := ioutil.ReadFile(filepath.Join(v1DataDir, "v106_syncer_checkpoint.sql")) + c.Assert(err, IsNil) + insertCpV106s := strings.ReplaceAll(string(insertCpV106), "123456", strconv.FormatUint(uint64(endPos.Pos), 10)) + // load syncer checkpoint into table. + _, err = dbConn.ExecuteSQL(tctx, nil, cfg.Name, []string{ + insertCpV106s, + }) + c.Assert(err, IsNil) + + // load online DDL metadata into table. + insertOnV106, err := ioutil.ReadFile(filepath.Join(v1DataDir, "v106_syncer_onlineddl.sql")) + c.Assert(err, IsNil) + _, err = dbConn.ExecuteSQL(tctx, nil, cfg.Name, []string{ + string(insertOnV106), + }) + c.Assert(err, IsNil) + + // update schema without GTID enabled. + c.Assert(UpdateSchema(tctx, t.db, cfg), IsNil) + + // verify the column data of online DDL already updated. + rows, err := dbConn.QuerySQL(tctx, fmt.Sprintf(`SELECT count(*) FROM %s`, dbutil.TableName(cfg.MetaSchema, cputil.SyncerOnlineDDL(cfg.Name)))) + c.Assert(err, IsNil) + c.Assert(rows.Next(), IsTrue) + var count int + c.Assert(rows.Scan(&count), IsNil) + c.Assert(rows.Next(), IsFalse) + c.Assert(rows.Err(), IsNil) + rows.Close() + c.Assert(count, Equals, 2) + + // verify the column data of checkpoint not updated. + rows, err = dbConn.QuerySQL(tctx, fmt.Sprintf(`SELECT binlog_gtid FROM %s`, dbutil.TableName(cfg.MetaSchema, cputil.SyncerCheckpoint(cfg.Name)))) + c.Assert(err, IsNil) + for rows.Next() { + var gs sql.NullString + c.Assert(rows.Scan(&gs), IsNil) + c.Assert(gs.Valid, IsFalse) + } + c.Assert(rows.Err(), IsNil) + rows.Close() + + // update schema with GTID enabled. + cfg.EnableGTID = true + c.Assert(UpdateSchema(tctx, t.db, cfg), IsNil) + + // verify the column data of global checkpoint already updated. + rows, err = dbConn.QuerySQL(tctx, fmt.Sprintf(`SELECT binlog_gtid FROM %s WHERE is_global=1`, dbutil.TableName(cfg.MetaSchema, cputil.SyncerCheckpoint(cfg.Name)))) + c.Assert(err, IsNil) + c.Assert(rows.Next(), IsTrue) + var gs sql.NullString + c.Assert(rows.Scan(&gs), IsNil) + c.Assert(rows.Next(), IsFalse) + c.Assert(rows.Err(), IsNil) + rows.Close() + c.Assert(gs.String, Equals, endGS.String()) + + rows, err = dbConn.QuerySQL(tctx, fmt.Sprintf(`SELECT binlog_gtid FROM %s WHERE is_global!=1`, dbutil.TableName(cfg.MetaSchema, cputil.SyncerCheckpoint(cfg.Name)))) + c.Assert(err, IsNil) + for rows.Next() { + var gs sql.NullString + c.Assert(rows.Scan(&gs), IsNil) + c.Assert(gs.Valid, IsFalse) + } + c.Assert(rows.Err(), IsNil) + rows.Close() +} diff --git a/pkg/v1dbschema/v106_data_for_test/v106_syncer_checkpoint-schema.sql b/pkg/v1dbschema/v106_data_for_test/v106_syncer_checkpoint-schema.sql new file mode 100644 index 0000000000..3f5281d39c --- /dev/null +++ b/pkg/v1dbschema/v106_data_for_test/v106_syncer_checkpoint-schema.sql @@ -0,0 +1,11 @@ +CREATE TABLE IF NOT EXISTS `dm_meta_v106_test`.`test_syncer_checkpoint` ( + `id` varchar(32) NOT NULL, + `cp_schema` varchar(128) NOT NULL, + `cp_table` varchar(128) NOT NULL, + `binlog_name` varchar(128) DEFAULT NULL, + `binlog_pos` int(10) unsigned DEFAULT NULL, + `is_global` tinyint(1) DEFAULT NULL, + `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, + `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + UNIQUE KEY `uk_id_schema_table` (`id`,`cp_schema`,`cp_table`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin diff --git a/pkg/v1dbschema/v106_data_for_test/v106_syncer_checkpoint.sql b/pkg/v1dbschema/v106_data_for_test/v106_syncer_checkpoint.sql new file mode 100644 index 0000000000..dcb6f85350 --- /dev/null +++ b/pkg/v1dbschema/v106_data_for_test/v106_syncer_checkpoint.sql @@ -0,0 +1,3 @@ +INSERT INTO `dm_meta_v106_test`.`test_syncer_checkpoint` VALUES +('mysql-replica-01','','','mysql-bin|000001.000001',123456,1,'2020-07-31 18:10:40','2020-07-31 18:11:40'), +('mysql-replica-01','db_single','t1','mysql-bin|000001.000001',123456,0,'2020-07-31 18:10:40','2020-07-31 18:10:40'); diff --git a/pkg/v1dbschema/v106_data_for_test/v106_syncer_onlineddl-schema.sql b/pkg/v1dbschema/v106_data_for_test/v106_syncer_onlineddl-schema.sql new file mode 100644 index 0000000000..6c281dd8fd --- /dev/null +++ b/pkg/v1dbschema/v106_data_for_test/v106_syncer_onlineddl-schema.sql @@ -0,0 +1,8 @@ +CREATE TABLE IF NOT EXISTS `dm_meta_v106_test`.`test_onlineddl` ( + `id` varchar(32) NOT NULL, + `ghost_schema` varchar(128) NOT NULL, + `ghost_table` varchar(128) NOT NULL, + `ddls` text DEFAULT NULL, + `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + UNIQUE KEY `uk_id_schema_table` (`id`,`ghost_schema`,`ghost_table`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin diff --git a/pkg/v1dbschema/v106_data_for_test/v106_syncer_onlineddl.sql b/pkg/v1dbschema/v106_data_for_test/v106_syncer_onlineddl.sql new file mode 100644 index 0000000000..70098ad30d --- /dev/null +++ b/pkg/v1dbschema/v106_data_for_test/v106_syncer_onlineddl.sql @@ -0,0 +1,3 @@ +INSERT INTO `dm_meta_v106_test`.`test_onlineddl` VALUES +('429523137','online_ddl','_t1_new','{\"schema\":\"online_ddl\",\"table\":\"t1\",\"ddls\":[\"ALTER TABLE `online_ddl`.`_t1_new` ADD COLUMN `age` INT\"]}','2020-08-04 12:05:48'), +('429523137','online_ddl','_t2_new','{\"schema\":\"online_ddl\",\"table\":\"t2\",\"ddls\":[\"ALTER TABLE `online_ddl`.`_t2_new` ADD COLUMN `age` INT\"]}','2020-08-04 12:05:49');