From f8e909aa2f3083484d27012cf1779e6229364b27 Mon Sep 17 00:00:00 2001 From: gmhdbjd Date: Wed, 23 Sep 2020 14:44:34 +0800 Subject: [PATCH 01/19] use OPTIMISTIC transaction as the default --- dm/config/task.go | 23 +++++++++++++++++++++-- dm/config/task_test.go | 31 +++++++++++++++++++++++++++---- 2 files changed, 48 insertions(+), 6 deletions(-) diff --git a/dm/config/task.go b/dm/config/task.go index 9ae394838f..17439d42c3 100644 --- a/dm/config/task.go +++ b/dm/config/task.go @@ -41,8 +41,10 @@ const ( // shard DDL mode. const ( - ShardPessimistic = "pessimistic" - ShardOptimistic = "optimistic" + ShardPessimistic = "pessimistic" + ShardOptimistic = "optimistic" + tidbTxnMode = "tidb_txn_mode" + tidbTxnOptimistic = "optimistic" ) // default config item values @@ -66,6 +68,11 @@ var ( defaultBatch = 100 defaultQueueSize = 1024 // do not give too large default value to avoid OOM defaultCheckpointFlushInterval = 30 // in seconds + + // TargetDBConfig + defaultSessionCfg = map[string]string{ + tidbTxnMode: tidbTxnOptimistic, + } ) // Meta represents binlog's meta pos @@ -411,6 +418,7 @@ func (c *TaskConfig) adjust() error { if c.TargetDB == nil { return terror.ErrConfigNeedTargetDB.Generate() } + adjustTargetDB(c.TargetDB) if len(c.MySQLInstances) == 0 { return terror.ErrConfigMySQLInstsAtLeastOne.Generate() @@ -707,3 +715,14 @@ func checkDuplicateString(ruleNames []string) []string { } return dupeArray } + +func adjustTargetDB(dbConfig *DBConfig) { + if dbConfig.Session == nil { + dbConfig.Session = make(map[string]string, len(defaultSessionCfg)) + } + for k, v := range defaultSessionCfg { + if _, ok := dbConfig.Session[k]; !ok { + dbConfig.Session[k] = v + } + } +} diff --git a/dm/config/task_test.go b/dm/config/task_test.go index 4ea78be3bf..90a4d5a895 100644 --- a/dm/config/task_test.go +++ b/dm/config/task_test.go @@ -324,7 +324,10 @@ func (t *testConfig) TestGenAndFromSubTaskConfigs(c *C) { heartbeatRI = 21 timezone = "Asia/Shanghai" maxAllowedPacket = 10244201 - session = map[string]string{ + fromSession = map[string]string{ + "sql_mode": " NO_AUTO_VALUE_ON_ZERO,ANSI_QUOTES", + } + toSession = map[string]string{ "sql_mode": " NO_AUTO_VALUE_ON_ZERO,ANSI_QUOTES", } security = Security{ @@ -380,7 +383,7 @@ func (t *testConfig) TestGenAndFromSubTaskConfigs(c *C) { User: "user_from_1", Password: "123", MaxAllowedPacket: &maxAllowedPacket, - Session: session, + Session: fromSession, Security: &security, RawDBCfg: &rawDBCfg, } @@ -390,7 +393,7 @@ func (t *testConfig) TestGenAndFromSubTaskConfigs(c *C) { User: "user_from_2", Password: "abc", MaxAllowedPacket: &maxAllowedPacket, - Session: session, + Session: fromSession, Security: &security, RawDBCfg: &rawDBCfg, } @@ -421,7 +424,7 @@ func (t *testConfig) TestGenAndFromSubTaskConfigs(c *C) { User: "user_to", Password: "abc", MaxAllowedPacket: &maxAllowedPacket, - Session: session, + Session: toSession, Security: &security, RawDBCfg: &rawDBCfg, }, @@ -576,6 +579,8 @@ func (t *testConfig) TestGenAndFromSubTaskConfigs(c *C) { stCfgs[0].EnableANSIQuotes = stCfg1.EnableANSIQuotes stCfgs[1].EnableANSIQuotes = stCfg2.EnableANSIQuotes c.Assert(stCfgs[0].String(), Equals, stCfg1.String()) + // subtask session cfg also changed because we ref DBConfig when merge from subtask config + stCfg2.To.Session[tidbTxnMode] = tidbTxnOptimistic c.Assert(stCfgs[1].String(), Equals, stCfg2.String()) } @@ -645,3 +650,21 @@ func (t *testConfig) TestMySQLInstance(c *C) { c.Assert(m.VerifyAndAdjust(), IsNil) } + +func (t *testConfig) TestAdjustSessionCfg(c *C) { + sessionCfg := map[string]string{tidbTxnMode: tidbTxnOptimistic} + dbCfg := &DBConfig{} + adjustTargetDB(dbCfg) + c.Assert(dbCfg.Session, DeepEquals, sessionCfg) + + sessionCfg["sql_mode"] = "ANSI_QUOTES" + dbCfg.Session["sql_mode"] = "ANSI_QUOTES" + adjustTargetDB(dbCfg) + c.Assert(dbCfg.Session, DeepEquals, sessionCfg) + + tidbTxnPessimistic := "pessimistic" + sessionCfg[tidbTxnMode] = tidbTxnPessimistic + dbCfg.Session[tidbTxnMode] = tidbTxnPessimistic + adjustTargetDB(dbCfg) + c.Assert(dbCfg.Session, DeepEquals, sessionCfg) +} From f1eab4a4c4572301dc46c96508d243600b9739e5 Mon Sep 17 00:00:00 2001 From: gmhdbjd Date: Wed, 23 Sep 2020 15:32:47 +0800 Subject: [PATCH 02/19] get variables if user doesn't specify --- pkg/schema/tracker.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/pkg/schema/tracker.go b/pkg/schema/tracker.go index 8dc3bf8e60..eeeeb5d397 100644 --- a/pkg/schema/tracker.go +++ b/pkg/schema/tracker.go @@ -63,8 +63,11 @@ func NewTracker(sessionCfg map[string]string, tidbConn *conn.BaseConn) (*Tracker if len(sessionCfg) == 0 { sessionCfg = make(map[string]string) - var ignoredColumn interface{} - for _, k := range sessionVars { + } + // get variables if user doesn't specify + for _, k := range sessionVars { + if _, ok := sessionCfg[k]; !ok { + var ignoredColumn interface{} rows, err2 := tidbConn.QuerySQL(tcontext.Background(), fmt.Sprintf("show variables like '%s'", k)) if err2 != nil { return nil, err2 From 130552805078797d2c4f43991f37a0c54e3859c4 Mon Sep 17 00:00:00 2001 From: gmhdbjd Date: Wed, 23 Sep 2020 16:21:05 +0800 Subject: [PATCH 03/19] fix ut --- pkg/schema/tracker.go | 2 +- syncer/checkpoint_test.go | 5 ++++- syncer/syncer_test.go | 5 ++++- 3 files changed, 9 insertions(+), 3 deletions(-) diff --git a/pkg/schema/tracker.go b/pkg/schema/tracker.go index eeeeb5d397..52acb84a19 100644 --- a/pkg/schema/tracker.go +++ b/pkg/schema/tracker.go @@ -68,7 +68,7 @@ func NewTracker(sessionCfg map[string]string, tidbConn *conn.BaseConn) (*Tracker for _, k := range sessionVars { if _, ok := sessionCfg[k]; !ok { var ignoredColumn interface{} - rows, err2 := tidbConn.QuerySQL(tcontext.Background(), fmt.Sprintf("show variables like '%s'", k)) + rows, err2 := tidbConn.QuerySQL(tcontext.Background(), fmt.Sprintf("SHOW VARIABLES LIKE '%s'", k)) if err2 != nil { return nil, err2 } diff --git a/syncer/checkpoint_test.go b/syncer/checkpoint_test.go index 89ae39cf79..cc23bc7bd4 100644 --- a/syncer/checkpoint_test.go +++ b/syncer/checkpoint_test.go @@ -70,7 +70,10 @@ func (s *testCheckpointSuite) SetUpSuite(c *C) { log.SetLevel(zapcore.ErrorLevel) var ( err error - defaultTestSessionCfg = map[string]string{"sql_mode": "ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION"} + defaultTestSessionCfg = map[string]string{ + "sql_mode": "ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION", + "tidb_skip_utf8_check": "0", + } ) s.tracker, err = schema.NewTracker(defaultTestSessionCfg, nil) diff --git a/syncer/syncer_test.go b/syncer/syncer_test.go index 55c0d588eb..fb062fc4ee 100644 --- a/syncer/syncer_test.go +++ b/syncer/syncer_test.go @@ -59,7 +59,10 @@ import ( var _ = Suite(&testSyncerSuite{}) var ( - defaultTestSessionCfg = map[string]string{"sql_mode": "ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION"} + defaultTestSessionCfg = map[string]string{ + "sql_mode": "ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION", + "tidb_skip_utf8_check": "0", + } ) func TestSuite(t *testing.T) { From 1c25c2f99a9347c8262cf0c108f91d1fedb2f57a Mon Sep 17 00:00:00 2001 From: gmhdbjd Date: Wed, 23 Sep 2020 17:43:25 +0800 Subject: [PATCH 04/19] address comment --- dm/config/task.go | 12 ++++++++---- dm/config/task_test.go | 2 +- pkg/schema/tracker.go | 1 + pkg/schema/tracker_test.go | 6 +++--- 4 files changed, 13 insertions(+), 8 deletions(-) diff --git a/dm/config/task.go b/dm/config/task.go index 17439d42c3..960e1d8515 100644 --- a/dm/config/task.go +++ b/dm/config/task.go @@ -717,12 +717,16 @@ func checkDuplicateString(ruleNames []string) []string { } func adjustTargetDB(dbConfig *DBConfig) { - if dbConfig.Session == nil { - dbConfig.Session = make(map[string]string, len(defaultSessionCfg)) + lowerMap := make(map[string]string, len(dbConfig.Session)) + for k, v := range dbConfig.Session { + lowerMap[strings.ToLower(k)] = v } + + // all cfg in defaultSessionCfg should be lower case for k, v := range defaultSessionCfg { - if _, ok := dbConfig.Session[k]; !ok { - dbConfig.Session[k] = v + if _, ok := lowerMap[k]; !ok { + lowerMap[k] = v } } + dbConfig.Session = lowerMap } diff --git a/dm/config/task_test.go b/dm/config/task_test.go index 90a4d5a895..441807b911 100644 --- a/dm/config/task_test.go +++ b/dm/config/task_test.go @@ -658,7 +658,7 @@ func (t *testConfig) TestAdjustSessionCfg(c *C) { c.Assert(dbCfg.Session, DeepEquals, sessionCfg) sessionCfg["sql_mode"] = "ANSI_QUOTES" - dbCfg.Session["sql_mode"] = "ANSI_QUOTES" + dbCfg.Session["SQL_MODE"] = "ANSI_QUOTES" adjustTargetDB(dbCfg) c.Assert(dbCfg.Session, DeepEquals, sessionCfg) diff --git a/pkg/schema/tracker.go b/pkg/schema/tracker.go index 52acb84a19..fff450e46a 100644 --- a/pkg/schema/tracker.go +++ b/pkg/schema/tracker.go @@ -65,6 +65,7 @@ func NewTracker(sessionCfg map[string]string, tidbConn *conn.BaseConn) (*Tracker sessionCfg = make(map[string]string) } // get variables if user doesn't specify + // all cfg in sessionVars should be lower case for _, k := range sessionVars { if _, ok := sessionCfg[k]; !ok { var ignoredColumn interface{} diff --git a/pkg/schema/tracker_test.go b/pkg/schema/tracker_test.go index 8a240b0dd5..b33c5c67f4 100644 --- a/pkg/schema/tracker_test.go +++ b/pkg/schema/tracker_test.go @@ -81,14 +81,14 @@ func (s *trackerSuite) TestTiDBAndSessionCfg(c *C) { c.Assert(err, NotNil) // discover session config failed, will return error - mock.ExpectQuery("show variables like 'sql_mode'").WillReturnRows( + mock.ExpectQuery("SHOW VARIABLES LIKE 'sql_mode'").WillReturnRows( sqlmock.NewRows([]string{"Variable_name", "Value"}). AddRow("sql_mode", "HaHa")) _, err = NewTracker(nil, baseConn) c.Assert(err, NotNil) // empty or default config in downstream - mock.ExpectQuery("show variables like 'sql_mode'").WillReturnRows( + mock.ExpectQuery("SHOW VARIABLES LIKE 'sql_mode'").WillReturnRows( sqlmock.NewRows([]string{"Variable_name", "Value"}). AddRow("sql_mode", "")) tracker, err := NewTracker(nil, baseConn) @@ -98,7 +98,7 @@ func (s *trackerSuite) TestTiDBAndSessionCfg(c *C) { c.Assert(err, IsNil) // found session config in downstream - mock.ExpectQuery("show variables like 'sql_mode'").WillReturnRows( + mock.ExpectQuery("SHOW VARIABLES LIKE 'sql_mode'").WillReturnRows( sqlmock.NewRows([]string{"Variable_name", "Value"}). AddRow("sql_mode", "ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_DATE,NO_ZERO_IN_DATE")) tracker, err = NewTracker(nil, baseConn) From a50cd9f79061e9d9b60c278097aa7894213c5dcc Mon Sep 17 00:00:00 2001 From: gmhdbjd Date: Thu, 24 Sep 2020 18:38:34 +0800 Subject: [PATCH 05/19] apply default session config base on tidb's version --- dm/config/task.go | 20 +++++++++------ dm/config/task_test.go | 20 --------------- dm/master/server.go | 24 ++++++++++++++++++ dm/master/server_test.go | 53 ++++++++++++++++++++++++++++++++++++++++ pkg/utils/db.go | 44 +++++++++++++++++++++++++++++++++ 5 files changed, 133 insertions(+), 28 deletions(-) diff --git a/dm/config/task.go b/dm/config/task.go index 960e1d8515..9b18d7a457 100644 --- a/dm/config/task.go +++ b/dm/config/task.go @@ -23,6 +23,7 @@ import ( "github.com/pingcap/dm/pkg/log" "github.com/pingcap/dm/pkg/terror" + "github.com/pingcap/dm/pkg/utils" "github.com/dustin/go-humanize" bf "github.com/pingcap/tidb-tools/pkg/binlog-filter" @@ -70,8 +71,12 @@ var ( defaultCheckpointFlushInterval = 30 // in seconds // TargetDBConfig - defaultSessionCfg = map[string]string{ - tidbTxnMode: tidbTxnOptimistic, + defaultSessionCfg = []struct { + key string + val string + minVersion utils.TiDBVersion + }{ + {tidbTxnMode, tidbTxnOptimistic, utils.TiDBVersion{3, 0, 0}}, } ) @@ -418,7 +423,6 @@ func (c *TaskConfig) adjust() error { if c.TargetDB == nil { return terror.ErrConfigNeedTargetDB.Generate() } - adjustTargetDB(c.TargetDB) if len(c.MySQLInstances) == 0 { return terror.ErrConfigMySQLInstsAtLeastOne.Generate() @@ -716,16 +720,16 @@ func checkDuplicateString(ruleNames []string) []string { return dupeArray } -func adjustTargetDB(dbConfig *DBConfig) { +// AdjustTargetDBSessionCfg adjust session cfg of TiDB +func AdjustTargetDBSessionCfg(dbConfig *DBConfig, version utils.TiDBVersion) { lowerMap := make(map[string]string, len(dbConfig.Session)) for k, v := range dbConfig.Session { lowerMap[strings.ToLower(k)] = v } - // all cfg in defaultSessionCfg should be lower case - for k, v := range defaultSessionCfg { - if _, ok := lowerMap[k]; !ok { - lowerMap[k] = v + for _, cfg := range defaultSessionCfg { + if _, ok := lowerMap[cfg.key]; !ok && version.Ge(cfg.minVersion) { + lowerMap[cfg.key] = cfg.val } } dbConfig.Session = lowerMap diff --git a/dm/config/task_test.go b/dm/config/task_test.go index 441807b911..57020ae460 100644 --- a/dm/config/task_test.go +++ b/dm/config/task_test.go @@ -579,8 +579,6 @@ func (t *testConfig) TestGenAndFromSubTaskConfigs(c *C) { stCfgs[0].EnableANSIQuotes = stCfg1.EnableANSIQuotes stCfgs[1].EnableANSIQuotes = stCfg2.EnableANSIQuotes c.Assert(stCfgs[0].String(), Equals, stCfg1.String()) - // subtask session cfg also changed because we ref DBConfig when merge from subtask config - stCfg2.To.Session[tidbTxnMode] = tidbTxnOptimistic c.Assert(stCfgs[1].String(), Equals, stCfg2.String()) } @@ -650,21 +648,3 @@ func (t *testConfig) TestMySQLInstance(c *C) { c.Assert(m.VerifyAndAdjust(), IsNil) } - -func (t *testConfig) TestAdjustSessionCfg(c *C) { - sessionCfg := map[string]string{tidbTxnMode: tidbTxnOptimistic} - dbCfg := &DBConfig{} - adjustTargetDB(dbCfg) - c.Assert(dbCfg.Session, DeepEquals, sessionCfg) - - sessionCfg["sql_mode"] = "ANSI_QUOTES" - dbCfg.Session["SQL_MODE"] = "ANSI_QUOTES" - adjustTargetDB(dbCfg) - c.Assert(dbCfg.Session, DeepEquals, sessionCfg) - - tidbTxnPessimistic := "pessimistic" - sessionCfg[tidbTxnMode] = tidbTxnPessimistic - dbCfg.Session[tidbTxnMode] = tidbTxnPessimistic - adjustTargetDB(dbCfg) - c.Assert(dbCfg.Session, DeepEquals, sessionCfg) -} diff --git a/dm/master/server.go b/dm/master/server.go index b0875a18c6..ee3e88788d 100644 --- a/dm/master/server.go +++ b/dm/master/server.go @@ -1247,6 +1247,25 @@ func parseAndAdjustSourceConfig(contents []string) ([]*config.SourceConfig, erro return cfgs, nil } +func adjustTargetDB(ctx context.Context, dbConfig *config.DBConfig) error { + toDB, err := conn.DefaultDBProvider.Apply(*dbConfig) + if err != nil { + return err + } + + value, err := dbutil.ShowVersion(ctx, toDB.DB) + if err != nil { + return err + } + + version, err := utils.ToTiDBVersion(value) + // Do not adjust if not TiDB + if err == nil { + config.AdjustTargetDBSessionCfg(dbConfig, version) + } + return nil +} + // OperateSource will create or update an upstream source. func (s *Server) OperateSource(ctx context.Context, req *pb.OperateSourceRequest) (*pb.OperateSourceResponse, error) { var ( @@ -1403,6 +1422,11 @@ func (s *Server) generateSubTask(ctx context.Context, task string) (*config.Task return nil, nil, terror.WithClass(err, terror.ClassDMMaster) } + err = adjustTargetDB(ctx, cfg.TargetDB) + if err != nil { + return nil, nil, terror.WithClass(err, terror.ClassDMMaster) + } + sourceCfgs, err := s.getSourceConfigs(cfg.MySQLInstances) if err != nil { return nil, nil, err diff --git a/dm/master/server_test.go b/dm/master/server_test.go index 7c8b46786a..73c14aa434 100644 --- a/dm/master/server_test.go +++ b/dm/master/server_test.go @@ -352,6 +352,12 @@ func (t *testMaster) TestCheckTask(c *check.C) { var wg sync.WaitGroup ctx, cancel := context.WithCancel(context.Background()) server.scheduler, _ = testMockScheduler(ctx, &wg, c, sources, workers, "", t.workerClients) + mock := t.initMockDB(c) + defer func() { + conn.DefaultDBProvider = &conn.DefaultDBProviderImpl{} + }() + mock.ExpectQuery("SHOW GLOBAL VARIABLES LIKE 'version'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}). + AddRow("version", "5.7.25-TiDB-v4.0.2")) resp, err := server.CheckTask(context.Background(), &pb.CheckTaskRequest{ Task: taskConfig, }) @@ -369,6 +375,8 @@ func (t *testMaster) TestCheckTask(c *check.C) { // simulate invalid password returned from scheduler, but config was supported plaintext mysql password, so cfg.SubTaskConfigs will success ctx, cancel = context.WithCancel(context.Background()) server.scheduler, _ = testMockScheduler(ctx, &wg, c, sources, workers, "invalid-encrypt-password", t.workerClients) + mock.ExpectQuery("SHOW GLOBAL VARIABLES LIKE 'version'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}). + AddRow("version", "5.7.25-TiDB-v4.0.2")) resp, err = server.CheckTask(context.Background(), &pb.CheckTaskRequest{ Task: taskConfig, }) @@ -402,6 +410,12 @@ func (t *testMaster) TestStartTask(c *check.C) { } server.scheduler, _ = testMockScheduler(ctx, &wg, c, sources, workers, "", makeWorkerClientsForHandle(ctrl, taskName, sources, workers, req)) + mock := t.initMockDB(c) + defer func() { + conn.DefaultDBProvider = &conn.DefaultDBProviderImpl{} + }() + mock.ExpectQuery("SHOW GLOBAL VARIABLES LIKE 'version'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}). + AddRow("version", "5.7.25-TiDB-v4.0.2")) resp, err = server.StartTask(context.Background(), req) c.Assert(err, check.IsNil) c.Assert(resp.Result, check.IsTrue) @@ -416,6 +430,8 @@ func (t *testMaster) TestStartTask(c *check.C) { // check start-task with an invalid source invalidSource := "invalid-source" + mock.ExpectQuery("SHOW GLOBAL VARIABLES LIKE 'version'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}). + AddRow("version", "5.7.25-TiDB-v4.0.2")) resp, err = server.StartTask(context.Background(), &pb.StartTaskRequest{ Task: taskConfig, Sources: []string{invalidSource}, @@ -434,6 +450,8 @@ func (t *testMaster) TestStartTask(c *check.C) { defer func() { checker.CheckSyncConfigFunc = bakCheckSyncConfigFunc }() + mock.ExpectQuery("SHOW GLOBAL VARIABLES LIKE 'version'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}). + AddRow("version", "5.7.25-TiDB-v4.0.2")) resp, err = server.StartTask(context.Background(), &pb.StartTaskRequest{ Task: taskConfig, Sources: sources, @@ -453,6 +471,13 @@ func (d *mockDBProvider) Apply(config config.DBConfig) (*conn.BaseDB, error) { return conn.NewBaseDB(d.db, func() {}), nil } +func (t *testMaster) initMockDB(c *check.C) sqlmock.Sqlmock { + db, mock, err := sqlmock.New() + c.Assert(err, check.IsNil) + conn.DefaultDBProvider = &mockDBProvider{db: db} + return mock +} + func (t *testMaster) TestStartTaskWithRemoveMeta(c *check.C) { ctrl := gomock.NewController(c) defer ctrl.Finish() @@ -505,6 +530,8 @@ func (t *testMaster) TestStartTaskWithRemoveMeta(c *check.C) { defer func() { conn.DefaultDBProvider = &conn.DefaultDBProviderImpl{} }() + mock.ExpectQuery("SHOW GLOBAL VARIABLES LIKE 'version'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}). + AddRow("version", "5.7.25-TiDB-v4.0.2")) mock.ExpectBegin() mock.ExpectExec(fmt.Sprintf("DROP TABLE IF EXISTS `%s`.`%s`", cfg.MetaSchema, cputil.LoaderCheckpoint(cfg.Name))).WillReturnResult(sqlmock.NewResult(1, 1)) mock.ExpectExec(fmt.Sprintf("DROP TABLE IF EXISTS `%s`.`%s`", cfg.MetaSchema, cputil.SyncerCheckpoint(cfg.Name))).WillReturnResult(sqlmock.NewResult(1, 1)) @@ -519,6 +546,12 @@ func (t *testMaster) TestStartTaskWithRemoveMeta(c *check.C) { defer wg.Done() time.Sleep(10 * time.Microsecond) // start another same task at the same time, should get err + mock := t.initMockDB(c) + defer func() { + conn.DefaultDBProvider = &conn.DefaultDBProviderImpl{} + }() + mock.ExpectQuery("SHOW GLOBAL VARIABLES LIKE 'version'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}). + AddRow("version", "5.7.25-TiDB-v4.0.2")) resp1, err1 := server.StartTask(context.Background(), req) c.Assert(err1, check.IsNil) c.Assert(resp1.Result, check.IsFalse) @@ -592,6 +625,8 @@ func (t *testMaster) TestStartTaskWithRemoveMeta(c *check.C) { db, mock, err = sqlmock.New() c.Assert(err, check.IsNil) conn.DefaultDBProvider = &mockDBProvider{db: db} + mock.ExpectQuery("SHOW GLOBAL VARIABLES LIKE 'version'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}). + AddRow("version", "10.0.0-MariaDB")) mock.ExpectBegin() mock.ExpectExec(fmt.Sprintf("DROP TABLE IF EXISTS `%s`.`%s`", cfg.MetaSchema, cputil.LoaderCheckpoint(cfg.Name))).WillReturnResult(sqlmock.NewResult(1, 1)) mock.ExpectExec(fmt.Sprintf("DROP TABLE IF EXISTS `%s`.`%s`", cfg.MetaSchema, cputil.SyncerCheckpoint(cfg.Name))).WillReturnResult(sqlmock.NewResult(1, 1)) @@ -606,6 +641,12 @@ func (t *testMaster) TestStartTaskWithRemoveMeta(c *check.C) { defer wg.Done() time.Sleep(10 * time.Microsecond) // start another same task at the same time, should get err + mock := t.initMockDB(c) + defer func() { + conn.DefaultDBProvider = &conn.DefaultDBProviderImpl{} + }() + mock.ExpectQuery("SHOW GLOBAL VARIABLES LIKE 'version'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}). + AddRow("version", "5.7.25-TiDB-v4.0.2")) resp1, err1 := server.StartTask(context.Background(), req) c.Assert(err1, check.IsNil) c.Assert(resp1.Result, check.IsFalse) @@ -688,6 +729,12 @@ func (t *testMaster) TestOperateTask(c *check.C) { sourceResps := []*pb.CommonWorkerResponse{{Result: true, Source: sources[0]}, {Result: true, Source: sources[1]}} server.scheduler, _ = testMockScheduler(ctx, &wg, c, sources, workers, "", makeWorkerClientsForHandle(ctrl, taskName, sources, workers, startReq, pauseReq, resumeReq, stopReq1, stopReq2)) + mock := t.initMockDB(c) + defer func() { + conn.DefaultDBProvider = &conn.DefaultDBProviderImpl{} + }() + mock.ExpectQuery("SHOW GLOBAL VARIABLES LIKE 'version'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}). + AddRow("version", "5.7.25-TiDB-v4.0.2")) stResp, err := server.StartTask(context.Background(), startReq) c.Assert(err, check.IsNil) c.Assert(stResp.Result, check.IsTrue) @@ -1401,6 +1448,12 @@ func (t *testMaster) TestGetTaskCfg(c *check.C) { makeWorkerClientsForHandle(ctrl, taskName, sources, workers, req)) // start task + mock := t.initMockDB(c) + defer func() { + conn.DefaultDBProvider = &conn.DefaultDBProviderImpl{} + }() + mock.ExpectQuery("SHOW GLOBAL VARIABLES LIKE 'version'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}). + AddRow("version", "5.7.25-TiDB-v4.0.2")) resp, err := server.StartTask(context.Background(), req) c.Assert(err, check.IsNil) c.Assert(resp.Result, check.IsTrue) diff --git a/pkg/utils/db.go b/pkg/utils/db.go index 01358bf76a..2a0420121b 100644 --- a/pkg/utils/db.go +++ b/pkg/utils/db.go @@ -38,6 +38,9 @@ import ( "github.com/pingcap/dm/pkg/terror" ) +// TiDBVersion represents TiDB version number. +type TiDBVersion [3]uint + var ( // for MariaDB, UUID set as `gtid_domain_id` + domainServerIDSeparator + `server_id` domainServerIDSeparator = "-" @@ -424,3 +427,44 @@ func GetGTID(db *sql.DB) (string, error) { val, err := GetGlobalVariable(db, "GTID_MODE") return val, err } + +// ToTiDBVersion convert string to TiDBVersion +// version format: 5.7.25-TiDB-v3.0.7 +func ToTiDBVersion(v string) (TiDBVersion, error) { + version := TiDBVersion{0, 0, 0} + tmp := strings.Split(v, "-") + if len(tmp) != 3 { + return version, errors.NotValidf("TiDB version %s", v) + } + + tmp = strings.Split(tmp[2], ".") + if len(tmp) != 3 { + return version, errors.NotValidf("TiDB version %s", v) + } + + if !strings.HasPrefix(tmp[0], "v") { + return version, errors.NotValidf("TiDB version %s", v) + } + tmp[0] = tmp[0][1:] + + for i := range tmp { + val, err := strconv.ParseUint(tmp[i], 10, 64) + if err != nil { + return version, errors.NotValidf("TiDB version %s", v) + } + version[i] = uint(val) + } + return version, nil +} + +// Ge means v >= min +func (v TiDBVersion) Ge(min TiDBVersion) bool { + for i := range v { + if v[i] > min[i] { + return true + } else if v[i] < min[i] { + return false + } + } + return true +} From 6a2961e9d859cd5d3d9edea6ac3503e7e30dc2e4 Mon Sep 17 00:00:00 2001 From: gmhdbjd Date: Thu, 24 Sep 2020 19:11:53 +0800 Subject: [PATCH 06/19] decrypt password --- dm/config/subtask.go | 16 ++++++++++++++++ dm/master/server.go | 9 ++++++++- dm/master/server_test.go | 8 ++++---- 3 files changed, 28 insertions(+), 5 deletions(-) diff --git a/dm/config/subtask.go b/dm/config/subtask.go index edf2eda8a7..594323be76 100644 --- a/dm/config/subtask.go +++ b/dm/config/subtask.go @@ -378,6 +378,22 @@ func (c *SubTaskConfig) DecryptPassword() (*SubTaskConfig, error) { return clone, nil } +// Clone returns a replica of DBConfig +func (db *DBConfig) Clone() (*DBConfig, error) { + content, err := db.Toml() + if err != nil { + return nil, err + } + + clone := &DBConfig{} + _, err = toml.Decode(content, clone) + if err != nil { + return nil, terror.ErrConfigTomlTransform.Delegate(err, "decode DB config from data") + } + + return clone, nil +} + // Clone returns a replica of SubTaskConfig func (c *SubTaskConfig) Clone() (*SubTaskConfig, error) { content, err := c.Toml() diff --git a/dm/master/server.go b/dm/master/server.go index ee3e88788d..a4eabd600c 100644 --- a/dm/master/server.go +++ b/dm/master/server.go @@ -1248,7 +1248,14 @@ func parseAndAdjustSourceConfig(contents []string) ([]*config.SourceConfig, erro } func adjustTargetDB(ctx context.Context, dbConfig *config.DBConfig) error { - toDB, err := conn.DefaultDBProvider.Apply(*dbConfig) + clone, err := dbConfig.Clone() + if err != nil { + return err + } + password := utils.DecryptOrPlaintext(clone.Password) + clone.Password = password + + toDB, err := conn.DefaultDBProvider.Apply(*clone) if err != nil { return err } diff --git a/dm/master/server_test.go b/dm/master/server_test.go index 73c14aa434..176fc12749 100644 --- a/dm/master/server_test.go +++ b/dm/master/server_test.go @@ -546,11 +546,11 @@ func (t *testMaster) TestStartTaskWithRemoveMeta(c *check.C) { defer wg.Done() time.Sleep(10 * time.Microsecond) // start another same task at the same time, should get err - mock := t.initMockDB(c) + mock2 := t.initMockDB(c) defer func() { conn.DefaultDBProvider = &conn.DefaultDBProviderImpl{} }() - mock.ExpectQuery("SHOW GLOBAL VARIABLES LIKE 'version'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}). + mock2.ExpectQuery("SHOW GLOBAL VARIABLES LIKE 'version'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}). AddRow("version", "5.7.25-TiDB-v4.0.2")) resp1, err1 := server.StartTask(context.Background(), req) c.Assert(err1, check.IsNil) @@ -641,11 +641,11 @@ func (t *testMaster) TestStartTaskWithRemoveMeta(c *check.C) { defer wg.Done() time.Sleep(10 * time.Microsecond) // start another same task at the same time, should get err - mock := t.initMockDB(c) + mock2 := t.initMockDB(c) defer func() { conn.DefaultDBProvider = &conn.DefaultDBProviderImpl{} }() - mock.ExpectQuery("SHOW GLOBAL VARIABLES LIKE 'version'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}). + mock2.ExpectQuery("SHOW GLOBAL VARIABLES LIKE 'version'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}). AddRow("version", "5.7.25-TiDB-v4.0.2")) resp1, err1 := server.StartTask(context.Background(), req) c.Assert(err1, check.IsNil) From fd24e532faec8b3a715cc689c0210455fb2e5b4d Mon Sep 17 00:00:00 2001 From: gmhdbjd Date: Fri, 25 Sep 2020 09:59:46 +0800 Subject: [PATCH 07/19] fix it --- tests/dmctl_basic/check_list/check_task.sh | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/dmctl_basic/check_list/check_task.sh b/tests/dmctl_basic/check_list/check_task.sh index 29d01dd379..2d8530b200 100644 --- a/tests/dmctl_basic/check_list/check_task.sh +++ b/tests/dmctl_basic/check_list/check_task.sh @@ -32,5 +32,6 @@ function check_task_error_database_config() { run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "check-task $task_conf" \ "Access denied for user" 1 \ - "Please check the database config in configuration file" 1 + "Please check the database connection and the database config in configuration file" 1 + } From 328817b27d440bfb55017f3fe06c2df1376b3146 Mon Sep 17 00:00:00 2001 From: gmhdbjd Date: Fri, 25 Sep 2020 10:01:18 +0800 Subject: [PATCH 08/19] remove line --- tests/dmctl_basic/check_list/check_task.sh | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/dmctl_basic/check_list/check_task.sh b/tests/dmctl_basic/check_list/check_task.sh index 2d8530b200..c503470d37 100644 --- a/tests/dmctl_basic/check_list/check_task.sh +++ b/tests/dmctl_basic/check_list/check_task.sh @@ -33,5 +33,4 @@ function check_task_error_database_config() { "check-task $task_conf" \ "Access denied for user" 1 \ "Please check the database connection and the database config in configuration file" 1 - } From 81e12943078643c0363fd85f295c793ee0d8c1db Mon Sep 17 00:00:00 2001 From: gmhdbjd Date: Fri, 25 Sep 2020 10:50:37 +0800 Subject: [PATCH 09/19] temp commit --- dm/master/bootstrap_test.go | 2 ++ dm/master/server.go | 5 ++++ dm/master/server_test.go | 56 ++++++++++++++++++++++++------------- 3 files changed, 44 insertions(+), 19 deletions(-) diff --git a/dm/master/bootstrap_test.go b/dm/master/bootstrap_test.go index 71ed09d962..25c1bcf4a0 100644 --- a/dm/master/bootstrap_test.go +++ b/dm/master/bootstrap_test.go @@ -157,6 +157,7 @@ func (t *testMaster) TestWaitWorkersReadyV1Import(c *C) { err = s.waitWorkersReadyV1Import(tctx, cfgs) c.Assert(err, IsNil) + clearEtcdEnv(c) } func (t *testMaster) TestSubtaskCfgsStagesV1Import(c *C) { @@ -342,4 +343,5 @@ func (t *testMaster) TestSubtaskCfgsStagesV1Import(c *C) { c.Assert(err, ErrorMatches, ".*fail to get subtask config and stage.*") c.Assert(cfgs, HasLen, 0) c.Assert(stages, HasLen, 0) + clearEtcdEnv(c) } diff --git a/dm/master/server.go b/dm/master/server.go index a4eabd600c..2fb296b59e 100644 --- a/dm/master/server.go +++ b/dm/master/server.go @@ -1236,12 +1236,15 @@ func parseAndAdjustSourceConfig(contents []string) ([]*config.SourceConfig, erro return cfgs, err } if err = cfg.Adjust(fromDB.DB); err != nil { + fromDB.Close() return cfgs, err } if _, err = cfg.Yaml(); err != nil { + fromDB.Close() return cfgs, err } + fromDB.Close() cfgs[i] = cfg } return cfgs, nil @@ -1262,6 +1265,7 @@ func adjustTargetDB(ctx context.Context, dbConfig *config.DBConfig) error { value, err := dbutil.ShowVersion(ctx, toDB.DB) if err != nil { + toDB.Close() return err } @@ -1270,6 +1274,7 @@ func adjustTargetDB(ctx context.Context, dbConfig *config.DBConfig) error { if err == nil { config.AdjustTargetDBSessionCfg(dbConfig, version) } + toDB.Close() return nil } diff --git a/dm/master/server_test.go b/dm/master/server_test.go index 176fc12749..a7d1488613 100644 --- a/dm/master/server_test.go +++ b/dm/master/server_test.go @@ -352,7 +352,7 @@ func (t *testMaster) TestCheckTask(c *check.C) { var wg sync.WaitGroup ctx, cancel := context.WithCancel(context.Background()) server.scheduler, _ = testMockScheduler(ctx, &wg, c, sources, workers, "", t.workerClients) - mock := t.initMockDB(c) + mock := t.initVersionDB(c) defer func() { conn.DefaultDBProvider = &conn.DefaultDBProviderImpl{} }() @@ -375,6 +375,7 @@ func (t *testMaster) TestCheckTask(c *check.C) { // simulate invalid password returned from scheduler, but config was supported plaintext mysql password, so cfg.SubTaskConfigs will success ctx, cancel = context.WithCancel(context.Background()) server.scheduler, _ = testMockScheduler(ctx, &wg, c, sources, workers, "invalid-encrypt-password", t.workerClients) + mock = t.initVersionDB(c) mock.ExpectQuery("SHOW GLOBAL VARIABLES LIKE 'version'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}). AddRow("version", "5.7.25-TiDB-v4.0.2")) resp, err = server.CheckTask(context.Background(), &pb.CheckTaskRequest{ @@ -410,7 +411,7 @@ func (t *testMaster) TestStartTask(c *check.C) { } server.scheduler, _ = testMockScheduler(ctx, &wg, c, sources, workers, "", makeWorkerClientsForHandle(ctrl, taskName, sources, workers, req)) - mock := t.initMockDB(c) + mock := t.initVersionDB(c) defer func() { conn.DefaultDBProvider = &conn.DefaultDBProviderImpl{} }() @@ -430,6 +431,7 @@ func (t *testMaster) TestStartTask(c *check.C) { // check start-task with an invalid source invalidSource := "invalid-source" + mock = t.initVersionDB(c) mock.ExpectQuery("SHOW GLOBAL VARIABLES LIKE 'version'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}). AddRow("version", "5.7.25-TiDB-v4.0.2")) resp, err = server.StartTask(context.Background(), &pb.StartTaskRequest{ @@ -450,6 +452,7 @@ func (t *testMaster) TestStartTask(c *check.C) { defer func() { checker.CheckSyncConfigFunc = bakCheckSyncConfigFunc }() + mock = t.initVersionDB(c) mock.ExpectQuery("SHOW GLOBAL VARIABLES LIKE 'version'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}). AddRow("version", "5.7.25-TiDB-v4.0.2")) resp, err = server.StartTask(context.Background(), &pb.StartTaskRequest{ @@ -463,18 +466,39 @@ func (t *testMaster) TestStartTask(c *check.C) { } type mockDBProvider struct { - db *sql.DB + verDB *sql.DB + db *sql.DB } // Apply will build BaseDB with DBConfig func (d *mockDBProvider) Apply(config config.DBConfig) (*conn.BaseDB, error) { + if d.verDB != nil { + verDB := d.verDB + d.verDB = nil + return conn.NewBaseDB(verDB, func() {}), nil + } return conn.NewBaseDB(d.db, func() {}), nil } +func (t *testMaster) initVersionDB(c *check.C) sqlmock.Sqlmock { + db, mock, err := sqlmock.New() + c.Assert(err, check.IsNil) + if mdbp, ok := conn.DefaultDBProvider.(*mockDBProvider); ok { + mdbp.verDB = db + } else { + conn.DefaultDBProvider = &mockDBProvider{verDB: db} + } + return mock +} + func (t *testMaster) initMockDB(c *check.C) sqlmock.Sqlmock { db, mock, err := sqlmock.New() c.Assert(err, check.IsNil) - conn.DefaultDBProvider = &mockDBProvider{db: db} + if mdbp, ok := conn.DefaultDBProvider.(*mockDBProvider); ok { + mdbp.db = db + } else { + conn.DefaultDBProvider = &mockDBProvider{db: db} + } return mock } @@ -524,14 +548,10 @@ func (t *testMaster) TestStartTaskWithRemoveMeta(c *check.C) { c.Assert(server.pessimist.Start(ctx, etcdTestCli), check.IsNil) c.Assert(server.optimist.Start(ctx, etcdTestCli), check.IsNil) - db, mock, err := sqlmock.New() - c.Assert(err, check.IsNil) - conn.DefaultDBProvider = &mockDBProvider{db: db} - defer func() { - conn.DefaultDBProvider = &conn.DefaultDBProviderImpl{} - }() - mock.ExpectQuery("SHOW GLOBAL VARIABLES LIKE 'version'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}). + verMock := t.initVersionDB(c) + verMock.ExpectQuery("SHOW GLOBAL VARIABLES LIKE 'version'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}). AddRow("version", "5.7.25-TiDB-v4.0.2")) + mock := t.initMockDB(c) mock.ExpectBegin() mock.ExpectExec(fmt.Sprintf("DROP TABLE IF EXISTS `%s`.`%s`", cfg.MetaSchema, cputil.LoaderCheckpoint(cfg.Name))).WillReturnResult(sqlmock.NewResult(1, 1)) mock.ExpectExec(fmt.Sprintf("DROP TABLE IF EXISTS `%s`.`%s`", cfg.MetaSchema, cputil.SyncerCheckpoint(cfg.Name))).WillReturnResult(sqlmock.NewResult(1, 1)) @@ -546,7 +566,7 @@ func (t *testMaster) TestStartTaskWithRemoveMeta(c *check.C) { defer wg.Done() time.Sleep(10 * time.Microsecond) // start another same task at the same time, should get err - mock2 := t.initMockDB(c) + mock2 := t.initVersionDB(c) defer func() { conn.DefaultDBProvider = &conn.DefaultDBProviderImpl{} }() @@ -622,10 +642,8 @@ func (t *testMaster) TestStartTaskWithRemoveMeta(c *check.C) { err = server.optimist.Start(ctx, etcdTestCli) c.Assert(err, check.IsNil) - db, mock, err = sqlmock.New() - c.Assert(err, check.IsNil) - conn.DefaultDBProvider = &mockDBProvider{db: db} - mock.ExpectQuery("SHOW GLOBAL VARIABLES LIKE 'version'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}). + verMock = t.initMockDB(c) + verMock.ExpectQuery("SHOW GLOBAL VARIABLES LIKE 'version'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}). AddRow("version", "10.0.0-MariaDB")) mock.ExpectBegin() mock.ExpectExec(fmt.Sprintf("DROP TABLE IF EXISTS `%s`.`%s`", cfg.MetaSchema, cputil.LoaderCheckpoint(cfg.Name))).WillReturnResult(sqlmock.NewResult(1, 1)) @@ -641,7 +659,7 @@ func (t *testMaster) TestStartTaskWithRemoveMeta(c *check.C) { defer wg.Done() time.Sleep(10 * time.Microsecond) // start another same task at the same time, should get err - mock2 := t.initMockDB(c) + mock2 := t.initVersionDB(c) defer func() { conn.DefaultDBProvider = &conn.DefaultDBProviderImpl{} }() @@ -729,7 +747,7 @@ func (t *testMaster) TestOperateTask(c *check.C) { sourceResps := []*pb.CommonWorkerResponse{{Result: true, Source: sources[0]}, {Result: true, Source: sources[1]}} server.scheduler, _ = testMockScheduler(ctx, &wg, c, sources, workers, "", makeWorkerClientsForHandle(ctrl, taskName, sources, workers, startReq, pauseReq, resumeReq, stopReq1, stopReq2)) - mock := t.initMockDB(c) + mock := t.initVersionDB(c) defer func() { conn.DefaultDBProvider = &conn.DefaultDBProviderImpl{} }() @@ -1448,7 +1466,7 @@ func (t *testMaster) TestGetTaskCfg(c *check.C) { makeWorkerClientsForHandle(ctrl, taskName, sources, workers, req)) // start task - mock := t.initMockDB(c) + mock := t.initVersionDB(c) defer func() { conn.DefaultDBProvider = &conn.DefaultDBProviderImpl{} }() From 6e1899d0a00aac60c6a9923ff7e7c28c33e71247 Mon Sep 17 00:00:00 2001 From: gmhdbjd Date: Fri, 25 Sep 2020 11:31:13 +0800 Subject: [PATCH 10/19] close db and fix test --- dm/master/server_test.go | 34 ++++++++++++++++------------------ 1 file changed, 16 insertions(+), 18 deletions(-) diff --git a/dm/master/server_test.go b/dm/master/server_test.go index a7d1488613..51e7fb7ca3 100644 --- a/dm/master/server_test.go +++ b/dm/master/server_test.go @@ -465,19 +465,19 @@ func (t *testMaster) TestStartTask(c *check.C) { clearSchedulerEnv(c, cancel, &wg) } +// db use for remove data +// verDB user for show version type mockDBProvider struct { verDB *sql.DB db *sql.DB } -// Apply will build BaseDB with DBConfig +// return db if verDB was closed func (d *mockDBProvider) Apply(config config.DBConfig) (*conn.BaseDB, error) { - if d.verDB != nil { - verDB := d.verDB - d.verDB = nil - return conn.NewBaseDB(verDB, func() {}), nil + if err := d.verDB.Ping(); err != nil { + return conn.NewBaseDB(d.db, func() {}), nil } - return conn.NewBaseDB(d.db, func() {}), nil + return conn.NewBaseDB(d.verDB, func() {}), nil } func (t *testMaster) initVersionDB(c *check.C) sqlmock.Sqlmock { @@ -549,6 +549,9 @@ func (t *testMaster) TestStartTaskWithRemoveMeta(c *check.C) { c.Assert(server.optimist.Start(ctx, etcdTestCli), check.IsNil) verMock := t.initVersionDB(c) + defer func() { + conn.DefaultDBProvider = &conn.DefaultDBProviderImpl{} + }() verMock.ExpectQuery("SHOW GLOBAL VARIABLES LIKE 'version'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}). AddRow("version", "5.7.25-TiDB-v4.0.2")) mock := t.initMockDB(c) @@ -566,11 +569,8 @@ func (t *testMaster) TestStartTaskWithRemoveMeta(c *check.C) { defer wg.Done() time.Sleep(10 * time.Microsecond) // start another same task at the same time, should get err - mock2 := t.initVersionDB(c) - defer func() { - conn.DefaultDBProvider = &conn.DefaultDBProviderImpl{} - }() - mock2.ExpectQuery("SHOW GLOBAL VARIABLES LIKE 'version'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}). + verMock2 := t.initVersionDB(c) + verMock2.ExpectQuery("SHOW GLOBAL VARIABLES LIKE 'version'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}). AddRow("version", "5.7.25-TiDB-v4.0.2")) resp1, err1 := server.StartTask(context.Background(), req) c.Assert(err1, check.IsNil) @@ -642,9 +642,10 @@ func (t *testMaster) TestStartTaskWithRemoveMeta(c *check.C) { err = server.optimist.Start(ctx, etcdTestCli) c.Assert(err, check.IsNil) - verMock = t.initMockDB(c) + verMock = t.initVersionDB(c) verMock.ExpectQuery("SHOW GLOBAL VARIABLES LIKE 'version'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}). - AddRow("version", "10.0.0-MariaDB")) + AddRow("version", "5.7.25-TiDB-v4.0.2")) + mock = t.initMockDB(c) mock.ExpectBegin() mock.ExpectExec(fmt.Sprintf("DROP TABLE IF EXISTS `%s`.`%s`", cfg.MetaSchema, cputil.LoaderCheckpoint(cfg.Name))).WillReturnResult(sqlmock.NewResult(1, 1)) mock.ExpectExec(fmt.Sprintf("DROP TABLE IF EXISTS `%s`.`%s`", cfg.MetaSchema, cputil.SyncerCheckpoint(cfg.Name))).WillReturnResult(sqlmock.NewResult(1, 1)) @@ -659,11 +660,8 @@ func (t *testMaster) TestStartTaskWithRemoveMeta(c *check.C) { defer wg.Done() time.Sleep(10 * time.Microsecond) // start another same task at the same time, should get err - mock2 := t.initVersionDB(c) - defer func() { - conn.DefaultDBProvider = &conn.DefaultDBProviderImpl{} - }() - mock2.ExpectQuery("SHOW GLOBAL VARIABLES LIKE 'version'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}). + vermock2 := t.initVersionDB(c) + vermock2.ExpectQuery("SHOW GLOBAL VARIABLES LIKE 'version'").WillReturnRows(sqlmock.NewRows([]string{"Variable_name", "Value"}). AddRow("version", "5.7.25-TiDB-v4.0.2")) resp1, err1 := server.StartTask(context.Background(), req) c.Assert(err1, check.IsNil) From 0dcceb392cc7dfcebe5e2924a17c35b86e440e07 Mon Sep 17 00:00:00 2001 From: gmhdbjd Date: Fri, 25 Sep 2020 13:27:31 +0800 Subject: [PATCH 11/19] add ut --- dm/config/subtask.go | 16 ------------ dm/config/task_test.go | 35 +++++++++++++++++++++++++++ dm/master/server.go | 10 +++----- pkg/utils/db_test.go | 55 ++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 94 insertions(+), 22 deletions(-) diff --git a/dm/config/subtask.go b/dm/config/subtask.go index 594323be76..edf2eda8a7 100644 --- a/dm/config/subtask.go +++ b/dm/config/subtask.go @@ -378,22 +378,6 @@ func (c *SubTaskConfig) DecryptPassword() (*SubTaskConfig, error) { return clone, nil } -// Clone returns a replica of DBConfig -func (db *DBConfig) Clone() (*DBConfig, error) { - content, err := db.Toml() - if err != nil { - return nil, err - } - - clone := &DBConfig{} - _, err = toml.Decode(content, clone) - if err != nil { - return nil, terror.ErrConfigTomlTransform.Delegate(err, "decode DB config from data") - } - - return clone, nil -} - // Clone returns a replica of SubTaskConfig func (c *SubTaskConfig) Clone() (*SubTaskConfig, error) { content, err := c.Toml() diff --git a/dm/config/task_test.go b/dm/config/task_test.go index 57020ae460..8d372ba42c 100644 --- a/dm/config/task_test.go +++ b/dm/config/task_test.go @@ -19,6 +19,7 @@ import ( "sort" "github.com/pingcap/dm/pkg/terror" + "github.com/pingcap/dm/pkg/utils" . "github.com/pingcap/check" bf "github.com/pingcap/tidb-tools/pkg/binlog-filter" @@ -648,3 +649,37 @@ func (t *testConfig) TestMySQLInstance(c *C) { c.Assert(m.VerifyAndAdjust(), IsNil) } + +func (t *testConfig) TestAdjustTargetDBConfig(c *C) { + testCases := []struct { + dbConfig DBConfig + result DBConfig + version utils.TiDBVersion + }{ + { + DBConfig{}, + DBConfig{Session: map[string]string{}}, + utils.TiDBVersion{0, 0, 0}, + }, + { + DBConfig{Session: map[string]string{"ANSI_QUOTES": ""}}, + DBConfig{Session: map[string]string{"ansi_quotes": ""}}, + utils.TiDBVersion{2, 0, 7}, + }, + { + DBConfig{}, + DBConfig{Session: map[string]string{tidbTxnMode: tidbTxnOptimistic}}, + utils.TiDBVersion{3, 0, 1}, + }, + { + DBConfig{Session: map[string]string{"ANSI_QUOTES": "", tidbTxnMode: "pessimistic"}}, + DBConfig{Session: map[string]string{"ansi_quotes": "", tidbTxnMode: "pessimistic"}}, + utils.TiDBVersion{4, 0, 2}, + }, + } + + for _, tc := range testCases { + AdjustTargetDBSessionCfg(&tc.dbConfig, tc.version) + c.Assert(tc.dbConfig, DeepEquals, tc.result) + } +} diff --git a/dm/master/server.go b/dm/master/server.go index 2fb296b59e..22c78ecef0 100644 --- a/dm/master/server.go +++ b/dm/master/server.go @@ -1251,14 +1251,12 @@ func parseAndAdjustSourceConfig(contents []string) ([]*config.SourceConfig, erro } func adjustTargetDB(ctx context.Context, dbConfig *config.DBConfig) error { - clone, err := dbConfig.Clone() - if err != nil { - return err + cfg := *dbConfig + if len(cfg.Password) > 0 { + cfg.Password = utils.DecryptOrPlaintext(cfg.Password) } - password := utils.DecryptOrPlaintext(clone.Password) - clone.Password = password - toDB, err := conn.DefaultDBProvider.Apply(*clone) + toDB, err := conn.DefaultDBProvider.Apply(cfg) if err != nil { return err } diff --git a/pkg/utils/db_test.go b/pkg/utils/db_test.go index 6192023436..cf335f37ce 100644 --- a/pkg/utils/db_test.go +++ b/pkg/utils/db_test.go @@ -15,10 +15,12 @@ package utils import ( "context" + "sort" "github.com/DATA-DOG/go-sqlmock" "github.com/go-sql-driver/mysql" . "github.com/pingcap/check" + "github.com/pingcap/errors" tmysql "github.com/pingcap/parser/mysql" gmysql "github.com/siddontang/go-mysql/mysql" ) @@ -288,3 +290,56 @@ func newMysqlErr(number uint16, message string) *mysql.MySQLError { Message: message, } } + +func (t *testDBSuite) TestTiDBVersion(c *C) { + testCases := []struct { + version string + result TiDBVersion + err error + }{ + { + "wrong-version", + TiDBVersion{0, 0, 0}, + errors.NotValidf("TiDB version %s", "wrong-version"), + }, { + "5.7.31-log", + TiDBVersion{0, 0, 0}, + errors.NotValidf("TiDB version %s", "5.7.31-log"), + }, { + "5.7.25-TiDB-v3.1.2", + TiDBVersion{3, 1, 2}, + nil, + }, + } + + for _, tc := range testCases { + tidbVer, err := ToTiDBVersion(tc.version) + if tc.err != nil { + c.Assert(err, NotNil) + c.Assert(err.Error(), Equals, tc.err.Error()) + } else { + c.Assert(tidbVer, DeepEquals, tc.result) + } + } + + vers := []TiDBVersion{ + {3, 0, 0}, + {2, 1, 1}, + {2, 0, 2}, + {1, 2, 1}, + {1, 1, 0}, + {1, 1, 0}, + {1, 0, 1}, + {1, 0, 0}, + } + + clone := make([]TiDBVersion, 0, len(vers)) + for _, v := range vers { + clone = append(clone, v) + } + + sort.Slice(vers, func(i, j int) bool { + return vers[i].Ge(vers[j]) + }) + c.Assert(vers, DeepEquals, clone) +} From a07c91564845e21560acf1129620f07effaa108c Mon Sep 17 00:00:00 2001 From: gmhdbjd Date: Fri, 25 Sep 2020 14:42:02 +0800 Subject: [PATCH 12/19] add it --- tests/all_mode/run.sh | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tests/all_mode/run.sh b/tests/all_mode/run.sh index 9c1b31693a..26f5ab1f55 100755 --- a/tests/all_mode/run.sh +++ b/tests/all_mode/run.sh @@ -92,6 +92,10 @@ function run() { # use sync_diff_inspector to check full dump loader check_sync_diff $WORK_DIR $cur/conf/diff_config.toml + # check default session config + check_log_contain_with_retry '\\"tidb_txn_mode\\":\\"optimistic\\"' $WORK_DIR/worker1/log/dm-worker.log + check_log_contain_with_retry '\\"tidb_txn_mode\\":\\"optimistic\\"' $WORK_DIR/worker2/log/dm-worker.log + # restart dm-worker1 pkill -hup dm-worker1.toml 2>/dev/null || true wait_process_exit dm-worker1.toml From 614def35dd27cc78a4d11c733fc5219ddece6e01 Mon Sep 17 00:00:00 2001 From: gmhdbjd Date: Fri, 25 Sep 2020 15:12:05 +0800 Subject: [PATCH 13/19] debug ci --- dm/config/task.go | 1 + dm/master/server.go | 2 ++ 2 files changed, 3 insertions(+) diff --git a/dm/config/task.go b/dm/config/task.go index 9b18d7a457..a9db02bd2f 100644 --- a/dm/config/task.go +++ b/dm/config/task.go @@ -722,6 +722,7 @@ func checkDuplicateString(ruleNames []string) []string { // AdjustTargetDBSessionCfg adjust session cfg of TiDB func AdjustTargetDBSessionCfg(dbConfig *DBConfig, version utils.TiDBVersion) { + log.L().Info("in adjust target db session cfg") lowerMap := make(map[string]string, len(dbConfig.Session)) for k, v := range dbConfig.Session { lowerMap[strings.ToLower(k)] = v diff --git a/dm/master/server.go b/dm/master/server.go index 22c78ecef0..55d5401af3 100644 --- a/dm/master/server.go +++ b/dm/master/server.go @@ -1256,6 +1256,7 @@ func adjustTargetDB(ctx context.Context, dbConfig *config.DBConfig) error { cfg.Password = utils.DecryptOrPlaintext(cfg.Password) } + log.L().Info("in adjust target db") toDB, err := conn.DefaultDBProvider.Apply(cfg) if err != nil { return err @@ -1266,6 +1267,7 @@ func adjustTargetDB(ctx context.Context, dbConfig *config.DBConfig) error { toDB.Close() return err } + log.L().Info("get version: ", zap.String("tidb version", value)) version, err := utils.ToTiDBVersion(value) // Do not adjust if not TiDB From 512048a496ff730d45c6918377b0d821dbab9da0 Mon Sep 17 00:00:00 2001 From: gmhdbjd Date: Fri, 25 Sep 2020 15:52:30 +0800 Subject: [PATCH 14/19] support beta version --- dm/master/server.go | 1 + pkg/utils/db.go | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/dm/master/server.go b/dm/master/server.go index 55d5401af3..03452b43e8 100644 --- a/dm/master/server.go +++ b/dm/master/server.go @@ -1270,6 +1270,7 @@ func adjustTargetDB(ctx context.Context, dbConfig *config.DBConfig) error { log.L().Info("get version: ", zap.String("tidb version", value)) version, err := utils.ToTiDBVersion(value) + log.L().Warn("get tidb version", log.ShortError(err)) // Do not adjust if not TiDB if err == nil { config.AdjustTargetDBSessionCfg(dbConfig, version) diff --git a/pkg/utils/db.go b/pkg/utils/db.go index 2a0420121b..f339973044 100644 --- a/pkg/utils/db.go +++ b/pkg/utils/db.go @@ -433,7 +433,7 @@ func GetGTID(db *sql.DB) (string, error) { func ToTiDBVersion(v string) (TiDBVersion, error) { version := TiDBVersion{0, 0, 0} tmp := strings.Split(v, "-") - if len(tmp) != 3 { + if len(tmp) < 3 { return version, errors.NotValidf("TiDB version %s", v) } From 9c4115b945dd686ecf32dd03917823d9ff6e93a0 Mon Sep 17 00:00:00 2001 From: gmhdbjd Date: Fri, 25 Sep 2020 16:28:42 +0800 Subject: [PATCH 15/19] Revert "debug ci" This reverts commit 614def35dd27cc78a4d11c733fc5219ddece6e01. --- dm/config/task.go | 1 - dm/master/server.go | 2 -- 2 files changed, 3 deletions(-) diff --git a/dm/config/task.go b/dm/config/task.go index a9db02bd2f..9b18d7a457 100644 --- a/dm/config/task.go +++ b/dm/config/task.go @@ -722,7 +722,6 @@ func checkDuplicateString(ruleNames []string) []string { // AdjustTargetDBSessionCfg adjust session cfg of TiDB func AdjustTargetDBSessionCfg(dbConfig *DBConfig, version utils.TiDBVersion) { - log.L().Info("in adjust target db session cfg") lowerMap := make(map[string]string, len(dbConfig.Session)) for k, v := range dbConfig.Session { lowerMap[strings.ToLower(k)] = v diff --git a/dm/master/server.go b/dm/master/server.go index 03452b43e8..33abe4297e 100644 --- a/dm/master/server.go +++ b/dm/master/server.go @@ -1256,7 +1256,6 @@ func adjustTargetDB(ctx context.Context, dbConfig *config.DBConfig) error { cfg.Password = utils.DecryptOrPlaintext(cfg.Password) } - log.L().Info("in adjust target db") toDB, err := conn.DefaultDBProvider.Apply(cfg) if err != nil { return err @@ -1267,7 +1266,6 @@ func adjustTargetDB(ctx context.Context, dbConfig *config.DBConfig) error { toDB.Close() return err } - log.L().Info("get version: ", zap.String("tidb version", value)) version, err := utils.ToTiDBVersion(value) log.L().Warn("get tidb version", log.ShortError(err)) From ae64af7a93ee22c06a44a950e252ba7e9ba79f97 Mon Sep 17 00:00:00 2001 From: gmhdbjd Date: Fri, 25 Sep 2020 17:15:07 +0800 Subject: [PATCH 16/19] revert clearEecdEnv --- dm/master/bootstrap_test.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/dm/master/bootstrap_test.go b/dm/master/bootstrap_test.go index 25c1bcf4a0..71ed09d962 100644 --- a/dm/master/bootstrap_test.go +++ b/dm/master/bootstrap_test.go @@ -157,7 +157,6 @@ func (t *testMaster) TestWaitWorkersReadyV1Import(c *C) { err = s.waitWorkersReadyV1Import(tctx, cfgs) c.Assert(err, IsNil) - clearEtcdEnv(c) } func (t *testMaster) TestSubtaskCfgsStagesV1Import(c *C) { @@ -343,5 +342,4 @@ func (t *testMaster) TestSubtaskCfgsStagesV1Import(c *C) { c.Assert(err, ErrorMatches, ".*fail to get subtask config and stage.*") c.Assert(cfgs, HasLen, 0) c.Assert(stages, HasLen, 0) - clearEtcdEnv(c) } From f93a2844ca15af3788ddc19cce93cbe70b6a970b Mon Sep 17 00:00:00 2001 From: gmhdbjd Date: Fri, 25 Sep 2020 17:19:56 +0800 Subject: [PATCH 17/19] add beta version ut --- pkg/utils/db_test.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pkg/utils/db_test.go b/pkg/utils/db_test.go index cf335f37ce..5058d24f17 100644 --- a/pkg/utils/db_test.go +++ b/pkg/utils/db_test.go @@ -309,6 +309,10 @@ func (t *testDBSuite) TestTiDBVersion(c *C) { "5.7.25-TiDB-v3.1.2", TiDBVersion{3, 1, 2}, nil, + }, { + "5.7.25-TiDB-v4.0.0-beta.2-1293-g0843f32c0-dirty", + TiDBVersion{4, 0, 0}, + nil, }, } From a3bdfb494c9aa5fc3e294ba73dff3e57a133bc82 Mon Sep 17 00:00:00 2001 From: gmhdbjd Date: Sun, 27 Sep 2020 10:20:00 +0800 Subject: [PATCH 18/19] minor fix --- dm/master/server.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/dm/master/server.go b/dm/master/server.go index 7f8a4f54f2..5219d5f10e 100644 --- a/dm/master/server.go +++ b/dm/master/server.go @@ -1051,20 +1051,20 @@ func adjustTargetDB(ctx context.Context, dbConfig *config.DBConfig) error { if err != nil { return err } + defer toDB.Close() value, err := dbutil.ShowVersion(ctx, toDB.DB) if err != nil { - toDB.Close() return err } version, err := utils.ToTiDBVersion(value) - log.L().Warn("get tidb version", log.ShortError(err)) // Do not adjust if not TiDB if err == nil { config.AdjustTargetDBSessionCfg(dbConfig, version) + } else { + log.L().Warn("get tidb version", log.ShortError(err)) } - toDB.Close() return nil } From e3ca26caf481f97044e1819860b20c8d0b4dccc8 Mon Sep 17 00:00:00 2001 From: gmhdbjd Date: Sun, 27 Sep 2020 11:57:52 +0800 Subject: [PATCH 19/19] address commment --- dm/config/task.go | 14 +++++----- dm/config/task_test.go | 24 ++++++++--------- dm/master/server.go | 2 +- go.mod | 1 + pkg/utils/db.go | 59 ++++++++++++------------------------------ pkg/utils/db_test.go | 44 +++++++++---------------------- 6 files changed, 50 insertions(+), 94 deletions(-) diff --git a/dm/config/task.go b/dm/config/task.go index 9b18d7a457..ff7236718c 100644 --- a/dm/config/task.go +++ b/dm/config/task.go @@ -23,13 +23,13 @@ import ( "github.com/pingcap/dm/pkg/log" "github.com/pingcap/dm/pkg/terror" - "github.com/pingcap/dm/pkg/utils" - - "github.com/dustin/go-humanize" bf "github.com/pingcap/tidb-tools/pkg/binlog-filter" "github.com/pingcap/tidb-tools/pkg/column-mapping" "github.com/pingcap/tidb-tools/pkg/filter" router "github.com/pingcap/tidb-tools/pkg/table-router" + + "github.com/coreos/go-semver/semver" + "github.com/dustin/go-humanize" "go.uber.org/zap" yaml "gopkg.in/yaml.v2" ) @@ -74,9 +74,9 @@ var ( defaultSessionCfg = []struct { key string val string - minVersion utils.TiDBVersion + minVersion *semver.Version }{ - {tidbTxnMode, tidbTxnOptimistic, utils.TiDBVersion{3, 0, 0}}, + {tidbTxnMode, tidbTxnOptimistic, semver.New("3.0.0")}, } ) @@ -721,14 +721,14 @@ func checkDuplicateString(ruleNames []string) []string { } // AdjustTargetDBSessionCfg adjust session cfg of TiDB -func AdjustTargetDBSessionCfg(dbConfig *DBConfig, version utils.TiDBVersion) { +func AdjustTargetDBSessionCfg(dbConfig *DBConfig, version *semver.Version) { lowerMap := make(map[string]string, len(dbConfig.Session)) for k, v := range dbConfig.Session { lowerMap[strings.ToLower(k)] = v } // all cfg in defaultSessionCfg should be lower case for _, cfg := range defaultSessionCfg { - if _, ok := lowerMap[cfg.key]; !ok && version.Ge(cfg.minVersion) { + if _, ok := lowerMap[cfg.key]; !ok && !version.LessThan(*cfg.minVersion) { lowerMap[cfg.key] = cfg.val } } diff --git a/dm/config/task_test.go b/dm/config/task_test.go index 8d372ba42c..8d0f9fc56b 100644 --- a/dm/config/task_test.go +++ b/dm/config/task_test.go @@ -18,13 +18,13 @@ import ( "path" "sort" - "github.com/pingcap/dm/pkg/terror" - "github.com/pingcap/dm/pkg/utils" - . "github.com/pingcap/check" + "github.com/pingcap/dm/pkg/terror" bf "github.com/pingcap/tidb-tools/pkg/binlog-filter" "github.com/pingcap/tidb-tools/pkg/filter" router "github.com/pingcap/tidb-tools/pkg/table-router" + + "github.com/coreos/go-semver/semver" ) func (t *testConfig) TestInvalidTaskConfig(c *C) { @@ -654,27 +654,27 @@ func (t *testConfig) TestAdjustTargetDBConfig(c *C) { testCases := []struct { dbConfig DBConfig result DBConfig - version utils.TiDBVersion + version *semver.Version }{ { DBConfig{}, DBConfig{Session: map[string]string{}}, - utils.TiDBVersion{0, 0, 0}, + semver.New("0.0.0"), }, { - DBConfig{Session: map[string]string{"ANSI_QUOTES": ""}}, - DBConfig{Session: map[string]string{"ansi_quotes": ""}}, - utils.TiDBVersion{2, 0, 7}, + DBConfig{Session: map[string]string{"SQL_MODE": "ANSI_QUOTES"}}, + DBConfig{Session: map[string]string{"sql_mode": "ANSI_QUOTES"}}, + semver.New("2.0.7"), }, { DBConfig{}, DBConfig{Session: map[string]string{tidbTxnMode: tidbTxnOptimistic}}, - utils.TiDBVersion{3, 0, 1}, + semver.New("3.0.1"), }, { - DBConfig{Session: map[string]string{"ANSI_QUOTES": "", tidbTxnMode: "pessimistic"}}, - DBConfig{Session: map[string]string{"ansi_quotes": "", tidbTxnMode: "pessimistic"}}, - utils.TiDBVersion{4, 0, 2}, + DBConfig{Session: map[string]string{"SQL_MODE": "", tidbTxnMode: "pessimistic"}}, + DBConfig{Session: map[string]string{"sql_mode": "", tidbTxnMode: "pessimistic"}}, + semver.New("4.0.0-beta.2"), }, } diff --git a/dm/master/server.go b/dm/master/server.go index 5219d5f10e..cc387b9ecd 100644 --- a/dm/master/server.go +++ b/dm/master/server.go @@ -1058,7 +1058,7 @@ func adjustTargetDB(ctx context.Context, dbConfig *config.DBConfig) error { return err } - version, err := utils.ToTiDBVersion(value) + version, err := utils.ExtractTiDBVersion(value) // Do not adjust if not TiDB if err == nil { config.AdjustTargetDBSessionCfg(dbConfig, version) diff --git a/go.mod b/go.mod index fafe3b2d30..f516ee11b2 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require ( github.com/Masterminds/semver v1.5.0 // indirect github.com/chaos-mesh/go-sqlsmith v0.0.0-00010101000000-000000000000 github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e + github.com/coreos/go-semver v0.3.0 github.com/docker/go-units v0.4.0 github.com/dustin/go-humanize v1.0.0 github.com/go-sql-driver/mysql v1.5.0 diff --git a/pkg/utils/db.go b/pkg/utils/db.go index f339973044..9dfee9d6d7 100644 --- a/pkg/utils/db.go +++ b/pkg/utils/db.go @@ -23,6 +23,7 @@ import ( "strings" "time" + "github.com/coreos/go-semver/semver" "github.com/go-sql-driver/mysql" "github.com/pingcap/errors" "github.com/pingcap/failpoint" @@ -38,9 +39,6 @@ import ( "github.com/pingcap/dm/pkg/terror" ) -// TiDBVersion represents TiDB version number. -type TiDBVersion [3]uint - var ( // for MariaDB, UUID set as `gtid_domain_id` + domainServerIDSeparator + `server_id` domainServerIDSeparator = "-" @@ -428,43 +426,20 @@ func GetGTID(db *sql.DB) (string, error) { return val, err } -// ToTiDBVersion convert string to TiDBVersion -// version format: 5.7.25-TiDB-v3.0.7 -func ToTiDBVersion(v string) (TiDBVersion, error) { - version := TiDBVersion{0, 0, 0} - tmp := strings.Split(v, "-") - if len(tmp) < 3 { - return version, errors.NotValidf("TiDB version %s", v) - } - - tmp = strings.Split(tmp[2], ".") - if len(tmp) != 3 { - return version, errors.NotValidf("TiDB version %s", v) - } - - if !strings.HasPrefix(tmp[0], "v") { - return version, errors.NotValidf("TiDB version %s", v) - } - tmp[0] = tmp[0][1:] - - for i := range tmp { - val, err := strconv.ParseUint(tmp[i], 10, 64) - if err != nil { - return version, errors.NotValidf("TiDB version %s", v) - } - version[i] = uint(val) - } - return version, nil -} - -// Ge means v >= min -func (v TiDBVersion) Ge(min TiDBVersion) bool { - for i := range v { - if v[i] > min[i] { - return true - } else if v[i] < min[i] { - return false - } - } - return true +// ExtractTiDBVersion extract tidb's version +// version format: "5.7.25-TiDB-v3.0.0-beta-211-g09beefbe0-dirty" +// ^~~~~~~~~^ +func ExtractTiDBVersion(version string) (*semver.Version, error) { + versions := strings.Split(strings.TrimSuffix(version, "-dirty"), "-") + end := len(versions) + switch end { + case 3, 4: + case 5, 6: + end -= 2 + default: + return nil, errors.Errorf("not a valid TiDB version: %s", version) + } + rawVersion := strings.Join(versions[2:end], "-") + rawVersion = strings.TrimPrefix(rawVersion, "v") + return semver.NewVersion(rawVersion) } diff --git a/pkg/utils/db_test.go b/pkg/utils/db_test.go index 5058d24f17..fa66e1ada5 100644 --- a/pkg/utils/db_test.go +++ b/pkg/utils/db_test.go @@ -15,13 +15,14 @@ package utils import ( "context" - "sort" - "github.com/DATA-DOG/go-sqlmock" - "github.com/go-sql-driver/mysql" . "github.com/pingcap/check" "github.com/pingcap/errors" tmysql "github.com/pingcap/parser/mysql" + + "github.com/DATA-DOG/go-sqlmock" + "github.com/coreos/go-semver/semver" + "github.com/go-sql-driver/mysql" gmysql "github.com/siddontang/go-mysql/mysql" ) @@ -294,30 +295,30 @@ func newMysqlErr(number uint16, message string) *mysql.MySQLError { func (t *testDBSuite) TestTiDBVersion(c *C) { testCases := []struct { version string - result TiDBVersion + result *semver.Version err error }{ { "wrong-version", - TiDBVersion{0, 0, 0}, - errors.NotValidf("TiDB version %s", "wrong-version"), + semver.New("0.0.0"), + errors.Errorf("not a valid TiDB version: %s", "wrong-version"), }, { "5.7.31-log", - TiDBVersion{0, 0, 0}, - errors.NotValidf("TiDB version %s", "5.7.31-log"), + semver.New("0.0.0"), + errors.Errorf("not a valid TiDB version: %s", "5.7.31-log"), }, { "5.7.25-TiDB-v3.1.2", - TiDBVersion{3, 1, 2}, + semver.New("3.1.2"), nil, }, { "5.7.25-TiDB-v4.0.0-beta.2-1293-g0843f32c0-dirty", - TiDBVersion{4, 0, 0}, + semver.New("4.0.00-beta.2"), nil, }, } for _, tc := range testCases { - tidbVer, err := ToTiDBVersion(tc.version) + tidbVer, err := ExtractTiDBVersion(tc.version) if tc.err != nil { c.Assert(err, NotNil) c.Assert(err.Error(), Equals, tc.err.Error()) @@ -325,25 +326,4 @@ func (t *testDBSuite) TestTiDBVersion(c *C) { c.Assert(tidbVer, DeepEquals, tc.result) } } - - vers := []TiDBVersion{ - {3, 0, 0}, - {2, 1, 1}, - {2, 0, 2}, - {1, 2, 1}, - {1, 1, 0}, - {1, 1, 0}, - {1, 0, 1}, - {1, 0, 0}, - } - - clone := make([]TiDBVersion, 0, len(vers)) - for _, v := range vers { - clone = append(clone, v) - } - - sort.Slice(vers, func(i, j int) bool { - return vers[i].Ge(vers[j]) - }) - c.Assert(vers, DeepEquals, clone) }