From 97ffafc638821f5e87ef10308a63e7643e023121 Mon Sep 17 00:00:00 2001 From: csuzhangxc Date: Fri, 31 Jul 2020 18:28:42 +0800 Subject: [PATCH 01/10] v1schema: add test data for checkpoint --- .../dm_meta.task_single_syncer_checkpoint-schema.sql | 11 +++++++++++ .../dm_meta.task_single_syncer_checkpoint.sql | 3 +++ 2 files changed, 14 insertions(+) create mode 100644 dm/v1dbschema/v106_data_for_test/dm_meta.task_single_syncer_checkpoint-schema.sql create mode 100644 dm/v1dbschema/v106_data_for_test/dm_meta.task_single_syncer_checkpoint.sql diff --git a/dm/v1dbschema/v106_data_for_test/dm_meta.task_single_syncer_checkpoint-schema.sql b/dm/v1dbschema/v106_data_for_test/dm_meta.task_single_syncer_checkpoint-schema.sql new file mode 100644 index 0000000000..41fbfcb5a3 --- /dev/null +++ b/dm/v1dbschema/v106_data_for_test/dm_meta.task_single_syncer_checkpoint-schema.sql @@ -0,0 +1,11 @@ +CREATE TABLE `dm_meta`.`task_single_syncer_checkpoint` ( + `id` varchar(32) NOT NULL, + `cp_schema` varchar(128) NOT NULL, + `cp_table` varchar(128) NOT NULL, + `binlog_name` varchar(128) DEFAULT NULL, + `binlog_pos` int(10) unsigned DEFAULT NULL, + `is_global` tinyint(1) DEFAULT NULL, + `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, + `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + UNIQUE KEY `uk_id_schema_table` (`id`,`cp_schema`,`cp_table`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin diff --git a/dm/v1dbschema/v106_data_for_test/dm_meta.task_single_syncer_checkpoint.sql b/dm/v1dbschema/v106_data_for_test/dm_meta.task_single_syncer_checkpoint.sql new file mode 100644 index 0000000000..e1b6b4de49 --- /dev/null +++ b/dm/v1dbschema/v106_data_for_test/dm_meta.task_single_syncer_checkpoint.sql @@ -0,0 +1,3 @@ +INSERT INTO `dm_meta`.`task_single_syncer_checkpoint` VALUES +('mysql-replica-01','','','mysql-bin|000001.000001',12641,1,'2020-07-31 18:10:40','2020-07-31 18:11:40'), +('mysql-replica-01','db_single','t1','mysql-bin|000001.000001',12610,0,'2020-07-31 18:10:40','2020-07-31 18:10:40'); From 9493838dbf631ca63676d8a6e61903fb9e6e5a27 Mon Sep 17 00:00:00 2001 From: csuzhangxc Date: Mon, 3 Aug 2020 18:26:33 +0800 Subject: [PATCH 02/10] v1dbschema: add column for syncer checkpoint --- pkg/v1dbschema/schema.go | 71 ++++++++++++ pkg/v1dbschema/schema_test.go | 108 ++++++++++++++++++ .../v106_syncer_checkpoint-schema.sql | 2 +- .../v106_syncer_checkpoint.sql | 2 +- 4 files changed, 181 insertions(+), 2 deletions(-) create mode 100644 pkg/v1dbschema/schema.go create mode 100644 pkg/v1dbschema/schema_test.go rename dm/v1dbschema/v106_data_for_test/dm_meta.task_single_syncer_checkpoint-schema.sql => pkg/v1dbschema/v106_data_for_test/v106_syncer_checkpoint-schema.sql (87%) rename dm/v1dbschema/v106_data_for_test/dm_meta.task_single_syncer_checkpoint.sql => pkg/v1dbschema/v106_data_for_test/v106_syncer_checkpoint.sql (77%) diff --git a/pkg/v1dbschema/schema.go b/pkg/v1dbschema/schema.go new file mode 100644 index 0000000000..fa4d9a78a0 --- /dev/null +++ b/pkg/v1dbschema/schema.go @@ -0,0 +1,71 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package v1dbschema + +import ( + "github.com/go-sql-driver/mysql" + "github.com/pingcap/errors" + "github.com/pingcap/tidb/errno" + + "github.com/pingcap/dm/pkg/conn" + tcontext "github.com/pingcap/dm/pkg/context" +) + +// UpdateSyncerCheckpoint updates the checkpoint table of sync unit, including: +// - update table schema: +// - add column `binlog_gtid VARCHAR(256)` +// - add column `table_info JSON NOT NULL` +// - update column value: +// - fill `binlog_gtid` based on `binlog_name` and `binlog_pos` if GTID mode enable +func UpdateSyncerCheckpoint(tctx *tcontext.Context, taskName, tableName string, db *conn.BaseDB, fillGTIDs bool) error { + // get DB connection. + dbConn, err := db.GetBaseConn(tctx.Ctx) + if err != nil { + return err + } + + // try to add columns. + // NOTE: ignore already exists error to continue the process. + sqls := []string{ + `ALTER TABLE ` + tableName + ` ADD COLUMN binlog_gtid VARCHAR(256) AFTER binlog_pos`, + `ALTER TABLE ` + tableName + ` ADD COLUMN table_info JSON NOT NULL AFTER binlog_gtid`, + } + + _, err = dbConn.ExecuteSQLWithIgnoreError(tctx, nil, taskName, ignoreError, sqls) + if err != nil { + return err + } + + if !fillGTIDs { + return nil + } + + // TODO(csuzhangxc): fill `binlog_gtid` based on `binlog_name` and `binlog_pos`. + return errors.New("Not Implemented") +} + +func ignoreError(err error) bool { + err = errors.Cause(err) // check the original error + mysqlErr, ok := err.(*mysql.MySQLError) + if !ok { + return false + } + + switch mysqlErr.Number { + case errno.ErrDupFieldName: + return true + default: + return false + } +} diff --git a/pkg/v1dbschema/schema_test.go b/pkg/v1dbschema/schema_test.go new file mode 100644 index 0000000000..c36fbf147f --- /dev/null +++ b/pkg/v1dbschema/schema_test.go @@ -0,0 +1,108 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package v1dbschema + +import ( + "io/ioutil" + "os" + "path/filepath" + "runtime" + "strconv" + "testing" + + . "github.com/pingcap/check" + "github.com/pingcap/tidb-tools/pkg/dbutil" + + "github.com/pingcap/dm/dm/config" + "github.com/pingcap/dm/pkg/conn" + tcontext "github.com/pingcap/dm/pkg/context" +) + +func TestSuite(t *testing.T) { + TestingT(t) +} + +type testSchema struct{} + +var _ = Suite(&testSchema{}) + +func (t *testSchema) setUpDBConn(c *C) *conn.BaseDB { + host := os.Getenv("MYSQL_HOST") + if host == "" { + host = "127.0.0.1" + } + port, _ := strconv.Atoi(os.Getenv("MYSQL_PORT")) + if port == 0 { + port = 3306 + } + user := os.Getenv("MYSQL_USER") + if user == "" { + user = "root" + } + password := os.Getenv("MYSQL_PSWD") + + cfg := config.DBConfig{ + Host: host, + Port: port, + User: user, + Password: password, + Session: map[string]string{"sql_log_bin ": "off"}, // do not enable binlog to break other unit test cases. + } + cfg.Adjust() + + db, err := conn.DefaultDBProvider.Apply(cfg) + c.Assert(err, IsNil) + + return db +} + +func (t *testSchema) TestSchemaV106ToV20x(c *C) { + var ( + tctx = tcontext.Background() + taskName = "test" + dbName = "dm_meta_v106_test" + syncerCptableName = dbutil.TableName(dbName, taskName+"_syncer_checkpoint") + ) + + db := t.setUpDBConn(c) + defer db.Close() + dbConn, err := db.GetBaseConn(tctx.Ctx) + c.Assert(err, IsNil) + + defer func() { + _, err = dbConn.ExecuteSQL(tctx, nil, taskName, []string{ + `DROP DATABASE ` + dbName, + }) + }() + + // create metadata schema. + _, err = dbConn.ExecuteSQL(tctx, nil, taskName, []string{ + `CREATE DATABASE IF NOT EXISTS ` + dbName, + }) + c.Assert(err, IsNil) + + // create v1.0.6 checkpoint table. + _, currFile, _, _ := runtime.Caller(0) + v1DataDir := filepath.Join(filepath.Dir(currFile), "v106_data_for_test") + createV106, err := ioutil.ReadFile(filepath.Join(v1DataDir, "v106_syncer_checkpoint-schema.sql")) + c.Assert(err, IsNil) + _, err = dbConn.ExecuteSQL(tctx, nil, taskName, []string{ + string(createV106), + }) + c.Assert(err, IsNil) + + c.Assert(UpdateSyncerCheckpoint(tctx, taskName, syncerCptableName, db, false), IsNil) + + c.Assert(UpdateSyncerCheckpoint(tctx, taskName, syncerCptableName, db, true), ErrorMatches, ".*Not Implemented.*") +} diff --git a/dm/v1dbschema/v106_data_for_test/dm_meta.task_single_syncer_checkpoint-schema.sql b/pkg/v1dbschema/v106_data_for_test/v106_syncer_checkpoint-schema.sql similarity index 87% rename from dm/v1dbschema/v106_data_for_test/dm_meta.task_single_syncer_checkpoint-schema.sql rename to pkg/v1dbschema/v106_data_for_test/v106_syncer_checkpoint-schema.sql index 41fbfcb5a3..3f5281d39c 100644 --- a/dm/v1dbschema/v106_data_for_test/dm_meta.task_single_syncer_checkpoint-schema.sql +++ b/pkg/v1dbschema/v106_data_for_test/v106_syncer_checkpoint-schema.sql @@ -1,4 +1,4 @@ -CREATE TABLE `dm_meta`.`task_single_syncer_checkpoint` ( +CREATE TABLE IF NOT EXISTS `dm_meta_v106_test`.`test_syncer_checkpoint` ( `id` varchar(32) NOT NULL, `cp_schema` varchar(128) NOT NULL, `cp_table` varchar(128) NOT NULL, diff --git a/dm/v1dbschema/v106_data_for_test/dm_meta.task_single_syncer_checkpoint.sql b/pkg/v1dbschema/v106_data_for_test/v106_syncer_checkpoint.sql similarity index 77% rename from dm/v1dbschema/v106_data_for_test/dm_meta.task_single_syncer_checkpoint.sql rename to pkg/v1dbschema/v106_data_for_test/v106_syncer_checkpoint.sql index e1b6b4de49..4dff209f13 100644 --- a/dm/v1dbschema/v106_data_for_test/dm_meta.task_single_syncer_checkpoint.sql +++ b/pkg/v1dbschema/v106_data_for_test/v106_syncer_checkpoint.sql @@ -1,3 +1,3 @@ -INSERT INTO `dm_meta`.`task_single_syncer_checkpoint` VALUES +INSERT INTO `dm_meta_v106_test`.`test_syncer_checkpoint` VALUES ('mysql-replica-01','','','mysql-bin|000001.000001',12641,1,'2020-07-31 18:10:40','2020-07-31 18:11:40'), ('mysql-replica-01','db_single','t1','mysql-bin|000001.000001',12610,0,'2020-07-31 18:10:40','2020-07-31 18:10:40'); From ae8460c4b7405fea0c983825d6045890f340b7d2 Mon Sep 17 00:00:00 2001 From: csuzhangxc Date: Tue, 4 Aug 2020 14:30:22 +0800 Subject: [PATCH 03/10] v1dbschema: update id from server-id to source-id for online DDL meta --- pkg/v1dbschema/schema.go | 23 ++++++++++-- pkg/v1dbschema/schema_test.go | 35 +++++++++++++++---- .../v106_syncer_onlineddl-schema.sql | 8 +++++ .../v106_syncer_onlineddl.sql | 3 ++ 4 files changed, 60 insertions(+), 9 deletions(-) create mode 100644 pkg/v1dbschema/v106_data_for_test/v106_syncer_onlineddl-schema.sql create mode 100644 pkg/v1dbschema/v106_data_for_test/v106_syncer_onlineddl.sql diff --git a/pkg/v1dbschema/schema.go b/pkg/v1dbschema/schema.go index fa4d9a78a0..aaa1d11408 100644 --- a/pkg/v1dbschema/schema.go +++ b/pkg/v1dbschema/schema.go @@ -14,6 +14,7 @@ package v1dbschema import ( + "fmt" "github.com/go-sql-driver/mysql" "github.com/pingcap/errors" "github.com/pingcap/tidb/errno" @@ -38,8 +39,8 @@ func UpdateSyncerCheckpoint(tctx *tcontext.Context, taskName, tableName string, // try to add columns. // NOTE: ignore already exists error to continue the process. sqls := []string{ - `ALTER TABLE ` + tableName + ` ADD COLUMN binlog_gtid VARCHAR(256) AFTER binlog_pos`, - `ALTER TABLE ` + tableName + ` ADD COLUMN table_info JSON NOT NULL AFTER binlog_gtid`, + fmt.Sprintf(`ALTER TABLE %s ADD COLUMN binlog_gtid VARCHAR(256) AFTER binlog_pos`, tableName), + fmt.Sprintf(`ALTER TABLE %s ADD COLUMN table_info JSON NOT NULL AFTER binlog_gtid`, tableName), } _, err = dbConn.ExecuteSQLWithIgnoreError(tctx, nil, taskName, ignoreError, sqls) @@ -55,6 +56,24 @@ func UpdateSyncerCheckpoint(tctx *tcontext.Context, taskName, tableName string, return errors.New("Not Implemented") } +// UpdateOnlineDDLMeta updates the online DDL meta data, including: +// - update the value of `id` from `server-id` to `source-id`. +func UpdateSyncerOnlineDDLMeta(tctx *tcontext.Context, taskName, sourceID, tableName string, serverID uint32, db *conn.BaseDB) error { + // get DB connection. + dbConn, err := db.GetBaseConn(tctx.Ctx) + if err != nil { + return err + } + + // update `id` from `server-id` to `source-id`. + sqls := []string{ + fmt.Sprintf(`UPDATE %s SET id=? WHERE id=?`, tableName), + } + args := []interface{}{sourceID, serverID} + _, err = dbConn.ExecuteSQL(tctx, nil, taskName, sqls, args) + return err +} + func ignoreError(err error) bool { err = errors.Cause(err) // check the original error mysqlErr, ok := err.(*mysql.MySQLError) diff --git a/pkg/v1dbschema/schema_test.go b/pkg/v1dbschema/schema_test.go index c36fbf147f..344cb1f1d8 100644 --- a/pkg/v1dbschema/schema_test.go +++ b/pkg/v1dbschema/schema_test.go @@ -69,10 +69,15 @@ func (t *testSchema) setUpDBConn(c *C) *conn.BaseDB { func (t *testSchema) TestSchemaV106ToV20x(c *C) { var ( + _, currFile, _, _ = runtime.Caller(0) + v1DataDir = filepath.Join(filepath.Dir(currFile), "v106_data_for_test") tctx = tcontext.Background() taskName = "test" + sourceID = "mysql-replica-01" + serverID = uint32(429523137) dbName = "dm_meta_v106_test" - syncerCptableName = dbutil.TableName(dbName, taskName+"_syncer_checkpoint") + syncerCpTableName = dbutil.TableName(dbName, taskName+"_syncer_checkpoint") + syncerOnTableName = dbutil.TableName(dbName, taskName+"_onlineddl") ) db := t.setUpDBConn(c) @@ -93,16 +98,32 @@ func (t *testSchema) TestSchemaV106ToV20x(c *C) { c.Assert(err, IsNil) // create v1.0.6 checkpoint table. - _, currFile, _, _ := runtime.Caller(0) - v1DataDir := filepath.Join(filepath.Dir(currFile), "v106_data_for_test") - createV106, err := ioutil.ReadFile(filepath.Join(v1DataDir, "v106_syncer_checkpoint-schema.sql")) + createCpV106, err := ioutil.ReadFile(filepath.Join(v1DataDir, "v106_syncer_checkpoint-schema.sql")) c.Assert(err, IsNil) _, err = dbConn.ExecuteSQL(tctx, nil, taskName, []string{ - string(createV106), + string(createCpV106), }) c.Assert(err, IsNil) - c.Assert(UpdateSyncerCheckpoint(tctx, taskName, syncerCptableName, db, false), IsNil) + c.Assert(UpdateSyncerCheckpoint(tctx, taskName, syncerCpTableName, db, false), IsNil) - c.Assert(UpdateSyncerCheckpoint(tctx, taskName, syncerCptableName, db, true), ErrorMatches, ".*Not Implemented.*") + c.Assert(UpdateSyncerCheckpoint(tctx, taskName, syncerCpTableName, db, true), ErrorMatches, ".*Not Implemented.*") + + // create v1.0.6 online DDL metadata table. + createOnV106, err := ioutil.ReadFile(filepath.Join(v1DataDir, "v106_syncer_onlineddl-schema.sql")) + c.Assert(err, IsNil) + _, err = dbConn.ExecuteSQL(tctx, nil, taskName, []string{ + string(createOnV106), + }) + c.Assert(err, IsNil) + + // load metadata into table. + insertOnV106, err := ioutil.ReadFile(filepath.Join(v1DataDir, "v106_syncer_onlineddl.sql")) + c.Assert(err, IsNil) + _, err = dbConn.ExecuteSQL(tctx, nil, taskName, []string{ + string(insertOnV106), + }) + c.Assert(err, IsNil) + + c.Assert(UpdateSyncerOnlineDDLMeta(tctx, taskName, sourceID, syncerOnTableName, serverID, db), IsNil) } diff --git a/pkg/v1dbschema/v106_data_for_test/v106_syncer_onlineddl-schema.sql b/pkg/v1dbschema/v106_data_for_test/v106_syncer_onlineddl-schema.sql new file mode 100644 index 0000000000..6c281dd8fd --- /dev/null +++ b/pkg/v1dbschema/v106_data_for_test/v106_syncer_onlineddl-schema.sql @@ -0,0 +1,8 @@ +CREATE TABLE IF NOT EXISTS `dm_meta_v106_test`.`test_onlineddl` ( + `id` varchar(32) NOT NULL, + `ghost_schema` varchar(128) NOT NULL, + `ghost_table` varchar(128) NOT NULL, + `ddls` text DEFAULT NULL, + `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + UNIQUE KEY `uk_id_schema_table` (`id`,`ghost_schema`,`ghost_table`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin diff --git a/pkg/v1dbschema/v106_data_for_test/v106_syncer_onlineddl.sql b/pkg/v1dbschema/v106_data_for_test/v106_syncer_onlineddl.sql new file mode 100644 index 0000000000..70098ad30d --- /dev/null +++ b/pkg/v1dbschema/v106_data_for_test/v106_syncer_onlineddl.sql @@ -0,0 +1,3 @@ +INSERT INTO `dm_meta_v106_test`.`test_onlineddl` VALUES +('429523137','online_ddl','_t1_new','{\"schema\":\"online_ddl\",\"table\":\"t1\",\"ddls\":[\"ALTER TABLE `online_ddl`.`_t1_new` ADD COLUMN `age` INT\"]}','2020-08-04 12:05:48'), +('429523137','online_ddl','_t2_new','{\"schema\":\"online_ddl\",\"table\":\"t2\",\"ddls\":[\"ALTER TABLE `online_ddl`.`_t2_new` ADD COLUMN `age` INT\"]}','2020-08-04 12:05:49'); From d816cb68254497db265e3e9cd7d45148c536509c Mon Sep 17 00:00:00 2001 From: csuzhangxc Date: Tue, 4 Aug 2020 15:08:06 +0800 Subject: [PATCH 04/10] v1dbschema: refine code --- pkg/v1dbschema/schema.go | 72 +++++++++++++++++++---------------- pkg/v1dbschema/schema_test.go | 53 +++++++++++++++++--------- 2 files changed, 74 insertions(+), 51 deletions(-) diff --git a/pkg/v1dbschema/schema.go b/pkg/v1dbschema/schema.go index aaa1d11408..b4af50a5c5 100644 --- a/pkg/v1dbschema/schema.go +++ b/pkg/v1dbschema/schema.go @@ -15,62 +15,70 @@ package v1dbschema import ( "fmt" + "github.com/go-sql-driver/mysql" "github.com/pingcap/errors" + "github.com/pingcap/tidb-tools/pkg/dbutil" "github.com/pingcap/tidb/errno" + "github.com/pingcap/dm/dm/config" "github.com/pingcap/dm/pkg/conn" tcontext "github.com/pingcap/dm/pkg/context" + "github.com/pingcap/dm/pkg/cputil" ) -// UpdateSyncerCheckpoint updates the checkpoint table of sync unit, including: -// - update table schema: -// - add column `binlog_gtid VARCHAR(256)` -// - add column `table_info JSON NOT NULL` -// - update column value: -// - fill `binlog_gtid` based on `binlog_name` and `binlog_pos` if GTID mode enable -func UpdateSyncerCheckpoint(tctx *tcontext.Context, taskName, tableName string, db *conn.BaseDB, fillGTIDs bool) error { - // get DB connection. - dbConn, err := db.GetBaseConn(tctx.Ctx) +// UpdateSchema updates the DB schema from v1.0.x to v2.0.x, including: +// - update checkpoint. +// - update online DDL meta. +func UpdateSchema(tctx *tcontext.Context, db *conn.BaseDB, cfg *config.SubTaskConfig) error { + // get db connection. + dbConn, err := db.GetBaseConn(tctx.Context()) if err != nil { return err } - // try to add columns. - // NOTE: ignore already exists error to continue the process. - sqls := []string{ - fmt.Sprintf(`ALTER TABLE %s ADD COLUMN binlog_gtid VARCHAR(256) AFTER binlog_pos`, tableName), - fmt.Sprintf(`ALTER TABLE %s ADD COLUMN table_info JSON NOT NULL AFTER binlog_gtid`, tableName), - } - - _, err = dbConn.ExecuteSQLWithIgnoreError(tctx, nil, taskName, ignoreError, sqls) + // update checkpoint. + err = updateSyncerCheckpoint(tctx, dbConn, cfg.Name, dbutil.TableName(cfg.MetaSchema, cputil.SyncerCheckpoint(cfg.Name)), cfg.EnableGTID) if err != nil { return err } - if !fillGTIDs { - return nil + // update online DDL meta. + err = updateSyncerOnlineDDLMeta(tctx, dbConn, cfg.Name, dbutil.TableName(cfg.MetaSchema, cputil.SyncerOnlineDDL(cfg.Name)), cfg.SourceID, cfg.ServerID) + return err +} + +// updateSyncerCheckpoint updates the checkpoint table of sync unit, including: +// - update table schema: +// - add column `binlog_gtid VARCHAR(256)`. +// - add column `table_info JSON NOT NULL`. +// - update column value: +// - fill `binlog_gtid` based on `binlog_name` and `binlog_pos` if GTID mode enable. +// NOTE: no need to update the value of `table_info` because DM can get schema automatically from downstream when replicating DML. +func updateSyncerCheckpoint(tctx *tcontext.Context, dbConn *conn.BaseConn, taskName, tableName string, fillGTIDs bool) error { + if fillGTIDs { + // TODO(csuzhangxc): fill `binlog_gtid` based on `binlog_name` and `binlog_pos`. + return errors.New("Not Implemented") } - // TODO(csuzhangxc): fill `binlog_gtid` based on `binlog_name` and `binlog_pos`. - return errors.New("Not Implemented") + // try to add columns. + // NOTE: ignore already exists error to continue the process. + sqls := []string{ + fmt.Sprintf(`ALTER TABLE %s ADD COLUMN binlog_gtid VARCHAR(256) AFTER binlog_pos`, tableName), + fmt.Sprintf(`ALTER TABLE %s ADD COLUMN table_info JSON NOT NULL AFTER binlog_gtid`, tableName), + } + _, err := dbConn.ExecuteSQLWithIgnoreError(tctx, nil, taskName, ignoreError, sqls) + return err } -// UpdateOnlineDDLMeta updates the online DDL meta data, including: +// updateSyncerOnlineDDLMeta updates the online DDL meta data, including: // - update the value of `id` from `server-id` to `source-id`. -func UpdateSyncerOnlineDDLMeta(tctx *tcontext.Context, taskName, sourceID, tableName string, serverID uint32, db *conn.BaseDB) error { - // get DB connection. - dbConn, err := db.GetBaseConn(tctx.Ctx) - if err != nil { - return err - } - - // update `id` from `server-id` to `source-id`. +func updateSyncerOnlineDDLMeta(tctx *tcontext.Context, dbConn *conn.BaseConn, taskName, tableName, sourceID string, serverID uint32) error { sqls := []string{ - fmt.Sprintf(`UPDATE %s SET id=? WHERE id=?`, tableName), + fmt.Sprintf(`UPDATE %s SET id=? WHERE id=?`, tableName), // for multiple columns. } args := []interface{}{sourceID, serverID} - _, err = dbConn.ExecuteSQL(tctx, nil, taskName, sqls, args) + _, err := dbConn.ExecuteSQL(tctx, nil, taskName, sqls, args) return err } diff --git a/pkg/v1dbschema/schema_test.go b/pkg/v1dbschema/schema_test.go index 344cb1f1d8..df245b7481 100644 --- a/pkg/v1dbschema/schema_test.go +++ b/pkg/v1dbschema/schema_test.go @@ -14,6 +14,7 @@ package v1dbschema import ( + "fmt" "io/ioutil" "os" "path/filepath" @@ -27,6 +28,7 @@ import ( "github.com/pingcap/dm/dm/config" "github.com/pingcap/dm/pkg/conn" tcontext "github.com/pingcap/dm/pkg/context" + "github.com/pingcap/dm/pkg/cputil" ) func TestSuite(t *testing.T) { @@ -72,12 +74,13 @@ func (t *testSchema) TestSchemaV106ToV20x(c *C) { _, currFile, _, _ = runtime.Caller(0) v1DataDir = filepath.Join(filepath.Dir(currFile), "v106_data_for_test") tctx = tcontext.Background() - taskName = "test" - sourceID = "mysql-replica-01" - serverID = uint32(429523137) - dbName = "dm_meta_v106_test" - syncerCpTableName = dbutil.TableName(dbName, taskName+"_syncer_checkpoint") - syncerOnTableName = dbutil.TableName(dbName, taskName+"_onlineddl") + + cfg = &config.SubTaskConfig{ + Name: "test", + SourceID: "mysql-replica-01", + ServerID: 429523137, + MetaSchema: "dm_meta_v106_test", + } ) db := t.setUpDBConn(c) @@ -86,44 +89,56 @@ func (t *testSchema) TestSchemaV106ToV20x(c *C) { c.Assert(err, IsNil) defer func() { - _, err = dbConn.ExecuteSQL(tctx, nil, taskName, []string{ - `DROP DATABASE ` + dbName, + _, err = dbConn.ExecuteSQL(tctx, nil, cfg.Name, []string{ + `DROP DATABASE ` + cfg.MetaSchema, }) }() // create metadata schema. - _, err = dbConn.ExecuteSQL(tctx, nil, taskName, []string{ - `CREATE DATABASE IF NOT EXISTS ` + dbName, + _, err = dbConn.ExecuteSQL(tctx, nil, cfg.Name, []string{ + `CREATE DATABASE IF NOT EXISTS ` + cfg.MetaSchema, }) c.Assert(err, IsNil) // create v1.0.6 checkpoint table. createCpV106, err := ioutil.ReadFile(filepath.Join(v1DataDir, "v106_syncer_checkpoint-schema.sql")) c.Assert(err, IsNil) - _, err = dbConn.ExecuteSQL(tctx, nil, taskName, []string{ + _, err = dbConn.ExecuteSQL(tctx, nil, cfg.Name, []string{ string(createCpV106), }) c.Assert(err, IsNil) - c.Assert(UpdateSyncerCheckpoint(tctx, taskName, syncerCpTableName, db, false), IsNil) - - c.Assert(UpdateSyncerCheckpoint(tctx, taskName, syncerCpTableName, db, true), ErrorMatches, ".*Not Implemented.*") - // create v1.0.6 online DDL metadata table. createOnV106, err := ioutil.ReadFile(filepath.Join(v1DataDir, "v106_syncer_onlineddl-schema.sql")) c.Assert(err, IsNil) - _, err = dbConn.ExecuteSQL(tctx, nil, taskName, []string{ + _, err = dbConn.ExecuteSQL(tctx, nil, cfg.Name, []string{ string(createOnV106), }) c.Assert(err, IsNil) - // load metadata into table. + // load online DDL metadata into table. insertOnV106, err := ioutil.ReadFile(filepath.Join(v1DataDir, "v106_syncer_onlineddl.sql")) c.Assert(err, IsNil) - _, err = dbConn.ExecuteSQL(tctx, nil, taskName, []string{ + _, err = dbConn.ExecuteSQL(tctx, nil, cfg.Name, []string{ string(insertOnV106), }) c.Assert(err, IsNil) - c.Assert(UpdateSyncerOnlineDDLMeta(tctx, taskName, sourceID, syncerOnTableName, serverID, db), IsNil) + // update schema without GTID enabled. + c.Assert(UpdateSchema(tctx, db, cfg), IsNil) + + // verify the column data of online DDL already updated. + rows, err := dbConn.QuerySQL(tctx, fmt.Sprintf(`SELECT count(*) FROM %s`, dbutil.TableName(cfg.MetaSchema, cputil.SyncerOnlineDDL(cfg.Name)))) + c.Assert(err, IsNil) + c.Assert(rows.Next(), IsTrue) + var count int + err = rows.Scan(&count) + c.Assert(err, IsNil) + c.Assert(count, Equals, 2) + c.Assert(rows.Next(), IsFalse) + c.Assert(rows.Err(), IsNil) + + // update schema with GTID enabled. + cfg.EnableGTID = true + c.Assert(UpdateSchema(tctx, db, cfg), ErrorMatches, ".*Not Implemented.*") } From 4a536b43c648e3145ac040fa7565d0e5ceca8cd3 Mon Sep 17 00:00:00 2001 From: csuzhangxc Date: Tue, 4 Aug 2020 16:56:13 +0800 Subject: [PATCH 05/10] binlog reader: get GTID sets for the specified binlog position --- pkg/binlog/reader/util.go | 135 +++++++++++++++++++++++++++++++++ pkg/binlog/reader/util_test.go | 68 +++++++++++++++++ 2 files changed, 203 insertions(+) create mode 100644 pkg/binlog/reader/util.go create mode 100644 pkg/binlog/reader/util_test.go diff --git a/pkg/binlog/reader/util.go b/pkg/binlog/reader/util.go new file mode 100644 index 0000000000..1a6cbb7fea --- /dev/null +++ b/pkg/binlog/reader/util.go @@ -0,0 +1,135 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package reader + +import ( + "context" + "fmt" + + "github.com/pingcap/errors" + "github.com/pingcap/parser" + uuid "github.com/satori/go.uuid" + gmysql "github.com/siddontang/go-mysql/mysql" + "github.com/siddontang/go-mysql/replication" + + "github.com/pingcap/dm/pkg/binlog/event" + "github.com/pingcap/dm/pkg/gtid" + "github.com/pingcap/dm/pkg/terror" + "github.com/pingcap/dm/relay/common" +) + +// GetGTIDsForPos tries to get GTID sets for the specified binlog position (for the corresponding txn). +// NOTE: this method is very similar with `relay/writer/file_util.go/getTxnPosGTIDs`, unify them if needed later. +// NOTE: this method is not well tested directly, but more tests have already been done for `relay/writer/file_util.go/getTxnPosGTIDs`. +func GetGTIDsForPos(ctx context.Context, r Reader, endPos gmysql.Position, parser2 *parser.Parser) (gtid.Set, error) { + // start to get and parse binlog event from the beginning of the file. + startPos := gmysql.Position{ + Name: endPos.Name, + Pos: 0, + } + err := r.StartSyncByPos(startPos) + if err != nil { + return nil, err + } + + var ( + flavor string + latestPos uint32 + latestGSet gmysql.GTIDSet + nextGTIDStr string // can be recorded if the coming transaction completed + ) + for { + var e *replication.BinlogEvent + e, err = r.GetEvent(ctx) + if err != nil { + return nil, err + } + + // NOTE: only update endPos/GTIDs for DDL/XID to get an complete transaction. + switch ev := e.Event.(type) { + case *replication.QueryEvent: + isDDL := common.CheckIsDDL(string(ev.Query), parser2) + if isDDL { + if latestGSet == nil { + // GTID not enabled, can't get GTIDs for the position. + return nil, errors.Errorf("should have a GTIDEvent before the DDL QueryEvent %+v", e.Header) + } + err = latestGSet.Update(nextGTIDStr) + if err != nil { + return nil, terror.Annotatef(err, "update GTID set %v with GTID %s", latestGSet, nextGTIDStr) + } + latestPos = e.Header.LogPos + } + case *replication.XIDEvent: + if latestGSet == nil { + // GTID not enabled, can't get GTIDs for the position. + return nil, errors.Errorf("should have a GTIDEvent before the XIDEvent %+v", e.Header) + } + err = latestGSet.Update(nextGTIDStr) + if err != nil { + return nil, terror.Annotatef(err, "update GTID set %v with GTID %s", latestGSet, nextGTIDStr) + } + latestPos = e.Header.LogPos + case *replication.GTIDEvent: + if latestGSet == nil { + return nil, errors.Errorf("should have a PreviousGTIDsEvent before the GTIDEvent %+v", e.Header) + } + // learn from: https://github.com/siddontang/go-mysql/blob/c6ab05a85eb86dc51a27ceed6d2f366a32874a24/replication/binlogsyncer.go#L736 + u, _ := uuid.FromBytes(ev.SID) + nextGTIDStr = fmt.Sprintf("%s:%d", u.String(), ev.GNO) + case *replication.MariadbGTIDEvent: + if latestGSet == nil { + return nil, errors.Errorf("should have a MariadbGTIDListEvent before the MariadbGTIDEvent %+v", e.Header) + } + // learn from: https://github.com/siddontang/go-mysql/blob/c6ab05a85eb86dc51a27ceed6d2f366a32874a24/replication/binlogsyncer.go#L745 + GTID := ev.GTID + nextGTIDStr = fmt.Sprintf("%d-%d-%d", GTID.DomainID, GTID.ServerID, GTID.SequenceNumber) + case *replication.PreviousGTIDsEvent: + // if GTID enabled, we can get a PreviousGTIDEvent after the FormatDescriptionEvent + // ref: https://github.com/mysql/mysql-server/blob/8cc757da3d87bf4a1f07dcfb2d3c96fed3806870/sql/binlog.cc#L4549 + // ref: https://github.com/mysql/mysql-server/blob/8cc757da3d87bf4a1f07dcfb2d3c96fed3806870/sql/binlog.cc#L5161 + var gSet gtid.Set + gSet, err = gtid.ParserGTID(gmysql.MySQLFlavor, ev.GTIDSets) + if err != nil { + return nil, err + } + latestGSet = gSet.Origin() + flavor = gmysql.MySQLFlavor + case *replication.MariadbGTIDListEvent: + // a MariadbGTIDListEvent logged in every binlog to record the current replication state if GTID enabled + // ref: https://mariadb.com/kb/en/library/gtid_list_event/ + gSet, err2 := event.GTIDsFromMariaDBGTIDListEvent(e) + if err2 != nil { + return nil, terror.Annotatef(err2, "get GTID set from MariadbGTIDListEvent %+v", e.Header) + } + latestGSet = gSet.Origin() + flavor = gmysql.MariaDBFlavor + } + + if latestPos == endPos.Pos { + // reach the end position, return the GTID sets. + if latestGSet == nil { + return nil, errors.Errorf("no GTIDs get for position %s", endPos) + } + var latestGTIDs gtid.Set + latestGTIDs, err = gtid.ParserGTID(flavor, latestGSet.String()) + if err != nil { + return nil, terror.Annotatef(err, "parse GTID set %s with flavor %s", latestGSet.String(), flavor) + } + return latestGTIDs, nil + } else if latestPos > endPos.Pos { + return nil, errors.Errorf("no GTIDs get for position %s", endPos) + } + } +} diff --git a/pkg/binlog/reader/util_test.go b/pkg/binlog/reader/util_test.go new file mode 100644 index 0000000000..777324ce34 --- /dev/null +++ b/pkg/binlog/reader/util_test.go @@ -0,0 +1,68 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package reader + +import ( + "context" + "time" + + . "github.com/pingcap/check" + gmysql "github.com/siddontang/go-mysql/mysql" + "github.com/siddontang/go-mysql/replication" + + "github.com/pingcap/dm/pkg/utils" +) + +// added to testTCPReaderSuite to re-use DB connection. +func (t *testTCPReaderSuite) TestGetGTIDsForPos(c *C) { + var ( + cfg = replication.BinlogSyncerConfig{ + ServerID: serverIDs[1], + Flavor: flavor, + Host: t.host, + Port: uint16(t.port), + User: t.user, + Password: t.password, + UseDecimal: true, + VerifyChecksum: true, + } + ctx, cancel = context.WithTimeout(context.Background(), 30*time.Second) + ) + defer cancel() + + endPos, endGS, err := utils.GetMasterStatus(t.db, flavor) + c.Assert(err, IsNil) + + parser2, err := utils.GetParser(t.db, false) + c.Assert(err, IsNil) + + r1 := NewTCPReader(cfg) + c.Assert(r1, NotNil) + defer r1.Close() + + gs, err := GetGTIDsForPos(ctx, r1, endPos, parser2) + c.Assert(err, IsNil) + c.Assert(gs.Equal(endGS), IsTrue) + + // try to get for an invalid pos. + r2 := NewTCPReader(cfg) + c.Assert(r2, NotNil) + defer r2.Close() + gs, err = GetGTIDsForPos(ctx, r2, gmysql.Position{ + Name: endPos.Name, + Pos: endPos.Pos - 1, + }, parser2) + c.Assert(err, ErrorMatches, ".*no GTIDs get for position.*") + c.Assert(gs, IsNil) +} From 8b854f084e8fa58b19352d7f90906b8e2a20808a Mon Sep 17 00:00:00 2001 From: csuzhangxc Date: Tue, 4 Aug 2020 18:46:51 +0800 Subject: [PATCH 06/10] v1dbschema: update binlog_gtid column --- _utils/terror_gen/errors_release.txt | 3 +- errors.toml | 8 +- pkg/terror/error_list.go | 8 +- pkg/v1dbschema/schema.go | 118 +++++++++++++++-- pkg/v1dbschema/schema_test.go | 121 ++++++++++++++---- .../v106_syncer_checkpoint.sql | 4 +- 6 files changed, 220 insertions(+), 42 deletions(-) diff --git a/_utils/terror_gen/errors_release.txt b/_utils/terror_gen/errors_release.txt index 079d7a6749..f33a27fa53 100644 --- a/_utils/terror_gen/errors_release.txt +++ b/_utils/terror_gen/errors_release.txt @@ -118,7 +118,8 @@ ErrShardDDLOptimismTrySyncFail,[code=11111:class=functional:scope=internal:level ErrConnInvalidTLSConfig,[code=11112:class=functional:scope=internal:level=medium], "Message: invalid TLS config, Workaround: Please check the `ssl-ca`, `ssl-cert` and `ssl-key` config." ErrConnRegistryTLSConfig,[code=11113:class=functional:scope=internal:level=medium], "Message: fail to registry TLS config" ErrUpgradeVersionEtcdFail,[code=11114:class=functional:scope=internal:level=high], "Message: fail to operate DM cluster version in etcd, Workaround: Please use `list-member --master` to confirm whether the DM-master cluster is healthy" -ErrInvalidV1WorkerMetaPath,[code=11115:class=functional:scope=internal:level=medium], "Message: %s is an invalid v1.0.x DM-worker meta path, Workaround: Please check no `meta-dir` set for v1.0.x DM-worker" +ErrInvalidV1WorkerMetaPath,[code=11115:class=functional:scope=internal:level=medium], "Message: %s is an invalid v1.0.x DM-worker meta path, Workaround: Please check no `meta-dir` set for v1.0.x DM-worker." +ErrFailUpdateV1DBSchema,[code=11116:class=functional:scope=internal:level=medium], "Message: fail to upgrade v1.0.x DB schema, Workaround: Please confirm that you have not violated any restrictions in the upgrade documentation." ErrConfigCheckItemNotSupport,[code=20001:class=config:scope=internal:level=medium], "Message: checking item %s is not supported\n%s, Workaround: Please check `ignore-checking-items` config in task configuration file, which can be set including `all`/`dump_privilege`/`replication_privilege`/`version`/`binlog_enable`/`binlog_format`/`binlog_row_image`/`table_schema`/`schema_of_shard_tables`/`auto_increment_ID`." ErrConfigTomlTransform,[code=20002:class=config:scope=internal:level=medium], "Message: %s, Workaround: Please check the configuration file has correct TOML format." ErrConfigYamlTransform,[code=20003:class=config:scope=internal:level=medium], "Message: %s, Workaround: Please check the configuration file has correct YAML format." diff --git a/errors.toml b/errors.toml index e7a50c345e..61e74b94f4 100644 --- a/errors.toml +++ b/errors.toml @@ -721,7 +721,13 @@ tags = ["internal", "high"] [error.DM-functional-11115] message = "%s is an invalid v1.0.x DM-worker meta path" description = "" -workaround = "Please check no `meta-dir` set for v1.0.x DM-worker" +workaround = "Please check no `meta-dir` set for v1.0.x DM-worker." +tags = ["internal", "medium"] + +[error.DM-functional-11116] +message = "fail to upgrade v1.0.x DB schema" +description = "" +workaround = "Please confirm that you have not violated any restrictions in the upgrade documentation." tags = ["internal", "medium"] [error.DM-config-20001] diff --git a/pkg/terror/error_list.go b/pkg/terror/error_list.go index 112f4cabc3..ed6a321b95 100644 --- a/pkg/terror/error_list.go +++ b/pkg/terror/error_list.go @@ -162,6 +162,9 @@ const ( // pkg/v1workermeta codeInvalidV1WorkerMetaPath + + // pkg/v1dbschema + codeFailUpdateV1DBSchema ) // Config related error code list @@ -728,7 +731,10 @@ var ( ErrUpgradeVersionEtcdFail = New(codeUpgradeVersionEtcdFail, ClassFunctional, ScopeInternal, LevelHigh, "fail to operate DM cluster version in etcd", "Please use `list-member --master` to confirm whether the DM-master cluster is healthy") // pkg/v1workermeta - ErrInvalidV1WorkerMetaPath = New(codeInvalidV1WorkerMetaPath, ClassFunctional, ScopeInternal, LevelMedium, "%s is an invalid v1.0.x DM-worker meta path", "Please check no `meta-dir` set for v1.0.x DM-worker") + ErrInvalidV1WorkerMetaPath = New(codeInvalidV1WorkerMetaPath, ClassFunctional, ScopeInternal, LevelMedium, "%s is an invalid v1.0.x DM-worker meta path", "Please check no `meta-dir` set for v1.0.x DM-worker.") + + // pkg/v1dbschema + ErrFailUpdateV1DBSchema = New(codeFailUpdateV1DBSchema, ClassFunctional, ScopeInternal, LevelMedium, "fail to upgrade v1.0.x DB schema", "Please confirm that you have not violated any restrictions in the upgrade documentation.") // Config related error ErrConfigCheckItemNotSupport = New(codeConfigCheckItemNotSupport, ClassConfig, ScopeInternal, LevelMedium, "checking item %s is not supported\n%s", "Please check `ignore-checking-items` config in task configuration file, which can be set including `all`/`dump_privilege`/`replication_privilege`/`version`/`binlog_enable`/`binlog_format`/`binlog_row_image`/`table_schema`/`schema_of_shard_tables`/`auto_increment_ID`.") diff --git a/pkg/v1dbschema/schema.go b/pkg/v1dbschema/schema.go index b4af50a5c5..747fd51515 100644 --- a/pkg/v1dbschema/schema.go +++ b/pkg/v1dbschema/schema.go @@ -18,13 +18,21 @@ import ( "github.com/go-sql-driver/mysql" "github.com/pingcap/errors" + "github.com/pingcap/parser" "github.com/pingcap/tidb-tools/pkg/dbutil" "github.com/pingcap/tidb/errno" + gmysql "github.com/siddontang/go-mysql/mysql" + "github.com/siddontang/go-mysql/replication" "github.com/pingcap/dm/dm/config" + "github.com/pingcap/dm/pkg/binlog" + "github.com/pingcap/dm/pkg/binlog/reader" "github.com/pingcap/dm/pkg/conn" tcontext "github.com/pingcap/dm/pkg/context" "github.com/pingcap/dm/pkg/cputil" + "github.com/pingcap/dm/pkg/gtid" + "github.com/pingcap/dm/pkg/terror" + "github.com/pingcap/dm/pkg/utils" ) // UpdateSchema updates the DB schema from v1.0.x to v2.0.x, including: @@ -34,18 +42,37 @@ func UpdateSchema(tctx *tcontext.Context, db *conn.BaseDB, cfg *config.SubTaskCo // get db connection. dbConn, err := db.GetBaseConn(tctx.Context()) if err != nil { - return err + return terror.ErrFailUpdateV1DBSchema.Delegate(err) + } + + // setup SQL parser. + parser2, err := utils.GetParser(db.DB, cfg.EnableANSIQuotes) + if err != nil { + return terror.ErrFailUpdateV1DBSchema.Delegate(err) } + // setup a TCP binlog reader (because no relay can be used when upgrading). + syncCfg := replication.BinlogSyncerConfig{ + ServerID: cfg.ServerID, + Flavor: cfg.Flavor, + Host: cfg.From.Host, + Port: uint16(cfg.From.Port), + User: cfg.From.User, + Password: cfg.From.Password, // plaintext. + UseDecimal: true, + VerifyChecksum: true, + } + tcpReader := reader.NewTCPReader(syncCfg) + // update checkpoint. - err = updateSyncerCheckpoint(tctx, dbConn, cfg.Name, dbutil.TableName(cfg.MetaSchema, cputil.SyncerCheckpoint(cfg.Name)), cfg.EnableGTID) + err = updateSyncerCheckpoint(tctx, dbConn, cfg.Name, dbutil.TableName(cfg.MetaSchema, cputil.SyncerCheckpoint(cfg.Name)), cfg.SourceID, cfg.EnableGTID, tcpReader, parser2) if err != nil { - return err + return terror.ErrFailUpdateV1DBSchema.Delegate(err) } // update online DDL meta. err = updateSyncerOnlineDDLMeta(tctx, dbConn, cfg.Name, dbutil.TableName(cfg.MetaSchema, cputil.SyncerOnlineDDL(cfg.Name)), cfg.SourceID, cfg.ServerID) - return err + return terror.ErrFailUpdateV1DBSchema.Delegate(err) } // updateSyncerCheckpoint updates the checkpoint table of sync unit, including: @@ -55,30 +82,97 @@ func UpdateSchema(tctx *tcontext.Context, db *conn.BaseDB, cfg *config.SubTaskCo // - update column value: // - fill `binlog_gtid` based on `binlog_name` and `binlog_pos` if GTID mode enable. // NOTE: no need to update the value of `table_info` because DM can get schema automatically from downstream when replicating DML. -func updateSyncerCheckpoint(tctx *tcontext.Context, dbConn *conn.BaseConn, taskName, tableName string, fillGTIDs bool) error { +func updateSyncerCheckpoint(tctx *tcontext.Context, dbConn *conn.BaseConn, taskName, tableName, sourceID string, fillGTIDs bool, tcpReader reader.Reader, parser2 *parser.Parser) error { + var gs gtid.Set if fillGTIDs { - // TODO(csuzhangxc): fill `binlog_gtid` based on `binlog_name` and `binlog_pos`. - return errors.New("Not Implemented") + // NOTE: get GTID sets for all (global & tables) binlog position has many problems, at least including: + // - it is a heavy work because it should read binlog events once for each position + // - some binlog file for the position may have already been purge + // so we only get GTID sets for the global position now, + // and this should only have side effects for in-syncing shard tables, but we can mention and warn this case in the user docs. + pos, err := getGlobalPos(tctx, dbConn, tableName, sourceID) + if err != nil { + return err + } + if pos.Name != "" { + realPos, err := binlog.RealMySQLPos(pos) + if err != nil { + return err + } + gs, err = reader.GetGTIDsForPos(tctx.Ctx, tcpReader, realPos, parser2) + if err != nil { + return err + } + } } // try to add columns. // NOTE: ignore already exists error to continue the process. - sqls := []string{ + queries := []string{ fmt.Sprintf(`ALTER TABLE %s ADD COLUMN binlog_gtid VARCHAR(256) AFTER binlog_pos`, tableName), fmt.Sprintf(`ALTER TABLE %s ADD COLUMN table_info JSON NOT NULL AFTER binlog_gtid`, tableName), } - _, err := dbConn.ExecuteSQLWithIgnoreError(tctx, nil, taskName, ignoreError, sqls) - return err + _, err := dbConn.ExecuteSQLWithIgnoreError(tctx, nil, taskName, ignoreError, queries) + if err != nil { + return err + } + + if fillGTIDs && gs != nil { + // set binlog_gtid, `gs` should valid here. + err = setGlobalGTIDs(tctx, dbConn, taskName, tableName, sourceID, gs.String()) + if err != nil { + return err + } + } + return nil } // updateSyncerOnlineDDLMeta updates the online DDL meta data, including: // - update the value of `id` from `server-id` to `source-id`. func updateSyncerOnlineDDLMeta(tctx *tcontext.Context, dbConn *conn.BaseConn, taskName, tableName, sourceID string, serverID uint32) error { - sqls := []string{ + queries := []string{ fmt.Sprintf(`UPDATE %s SET id=? WHERE id=?`, tableName), // for multiple columns. } args := []interface{}{sourceID, serverID} - _, err := dbConn.ExecuteSQL(tctx, nil, taskName, sqls, args) + _, err := dbConn.ExecuteSQL(tctx, nil, taskName, queries, args) + return err +} + +// getGlobalPos tries to get the global checkpoint position. +func getGlobalPos(tctx *tcontext.Context, dbConn *conn.BaseConn, tableName, sourceID string) (gmysql.Position, error) { + query := fmt.Sprintf(`SELECT binlog_name, binlog_pos FROM %s WHERE id=? AND is_global=? LIMIT 1`, tableName) + args := []interface{}{sourceID, true} + rows, err := dbConn.QuerySQL(tctx, query, args...) + if err != nil { + return gmysql.Position{}, err + } + defer rows.Close() + if !rows.Next() { + return gmysql.Position{}, nil // no global checkpoint position exists. + } + + var ( + name string + pos uint32 + ) + err = rows.Scan(&name, &pos) + if err != nil { + return gmysql.Position{}, err + } + + return gmysql.Position{ + Name: name, + Pos: pos, + }, rows.Err() +} + +// setGlobalGTIDs tries to set `binlog_gtid` for the global checkpoint. +func setGlobalGTIDs(tctx *tcontext.Context, dbConn *conn.BaseConn, taskName, tableName, sourceID, gs string) error { + queries := []string{ + fmt.Sprintf(`UPDATE %s SET binlog_gtid=? WHERE id=? AND is_global=? LIMIT 1`, tableName), + } + args := []interface{}{gs, sourceID, true} + _, err := dbConn.ExecuteSQL(tctx, nil, taskName, queries, args) return err } diff --git a/pkg/v1dbschema/schema_test.go b/pkg/v1dbschema/schema_test.go index df245b7481..07551959c6 100644 --- a/pkg/v1dbschema/schema_test.go +++ b/pkg/v1dbschema/schema_test.go @@ -14,59 +14,82 @@ package v1dbschema import ( + "database/sql" "fmt" "io/ioutil" "os" "path/filepath" "runtime" "strconv" + "strings" "testing" . "github.com/pingcap/check" "github.com/pingcap/tidb-tools/pkg/dbutil" + gmysql "github.com/siddontang/go-mysql/mysql" "github.com/pingcap/dm/dm/config" "github.com/pingcap/dm/pkg/conn" tcontext "github.com/pingcap/dm/pkg/context" "github.com/pingcap/dm/pkg/cputil" + "github.com/pingcap/dm/pkg/utils" ) func TestSuite(t *testing.T) { TestingT(t) } -type testSchema struct{} +type testSchema struct { + host string + port int + user string + password string + db *conn.BaseDB + sqlDB *sql.DB +} var _ = Suite(&testSchema{}) -func (t *testSchema) setUpDBConn(c *C) *conn.BaseDB { - host := os.Getenv("MYSQL_HOST") - if host == "" { - host = "127.0.0.1" +func (t *testSchema) SetUpSuite(c *C) { + t.setUpDBConn(c) +} + +func (t *testSchema) TestTearDown(c *C) { + t.db.Close() + t.sqlDB.Close() +} + +func (t *testSchema) setUpDBConn(c *C) { + t.host = os.Getenv("MYSQL_HOST") + if t.host == "" { + t.host = "127.0.0.1" } - port, _ := strconv.Atoi(os.Getenv("MYSQL_PORT")) - if port == 0 { - port = 3306 + t.port, _ = strconv.Atoi(os.Getenv("MYSQL_PORT")) + if t.port == 0 { + t.port = 3306 } - user := os.Getenv("MYSQL_USER") - if user == "" { - user = "root" + t.user = os.Getenv("MYSQL_USER") + if t.user == "" { + t.user = "root" } - password := os.Getenv("MYSQL_PSWD") + t.password = os.Getenv("MYSQL_PSWD") cfg := config.DBConfig{ - Host: host, - Port: port, - User: user, - Password: password, + Host: t.host, + Port: t.port, + User: t.user, + Password: t.password, Session: map[string]string{"sql_log_bin ": "off"}, // do not enable binlog to break other unit test cases. } cfg.Adjust() - db, err := conn.DefaultDBProvider.Apply(cfg) + var err error + t.db, err = conn.DefaultDBProvider.Apply(cfg) c.Assert(err, IsNil) - return db + dsn := fmt.Sprintf("%s:%s@tcp(%s:%d)/?charset=utf8mb4&sql_log_bin=off", t.user, t.password, t.host, t.port) + t.sqlDB, err = sql.Open("mysql", dsn) + c.Assert(err, IsNil) } func (t *testSchema) TestSchemaV106ToV20x(c *C) { @@ -80,12 +103,16 @@ func (t *testSchema) TestSchemaV106ToV20x(c *C) { SourceID: "mysql-replica-01", ServerID: 429523137, MetaSchema: "dm_meta_v106_test", + From: config.DBConfig{ + Host: t.host, + Port: t.port, + User: t.user, + Password: t.password, + }, } ) - db := t.setUpDBConn(c) - defer db.Close() - dbConn, err := db.GetBaseConn(tctx.Ctx) + dbConn, err := t.db.GetBaseConn(tctx.Ctx) c.Assert(err, IsNil) defer func() { @@ -116,6 +143,18 @@ func (t *testSchema) TestSchemaV106ToV20x(c *C) { }) c.Assert(err, IsNil) + // update position according to the current real position. + endPos, endGS, err := utils.GetMasterStatus(t.sqlDB, gmysql.MySQLFlavor) + c.Assert(err, IsNil) + insertCpV106, err := ioutil.ReadFile(filepath.Join(v1DataDir, "v106_syncer_checkpoint.sql")) + c.Assert(err, IsNil) + insertCpV106s := strings.ReplaceAll(string(insertCpV106), "123456", strconv.FormatUint(uint64(endPos.Pos), 10)) + // load syncer checkpoint into table. + _, err = dbConn.ExecuteSQL(tctx, nil, cfg.Name, []string{ + insertCpV106s, + }) + c.Assert(err, IsNil) + // load online DDL metadata into table. insertOnV106, err := ioutil.ReadFile(filepath.Join(v1DataDir, "v106_syncer_onlineddl.sql")) c.Assert(err, IsNil) @@ -125,20 +164,52 @@ func (t *testSchema) TestSchemaV106ToV20x(c *C) { c.Assert(err, IsNil) // update schema without GTID enabled. - c.Assert(UpdateSchema(tctx, db, cfg), IsNil) + c.Assert(UpdateSchema(tctx, t.db, cfg), IsNil) // verify the column data of online DDL already updated. rows, err := dbConn.QuerySQL(tctx, fmt.Sprintf(`SELECT count(*) FROM %s`, dbutil.TableName(cfg.MetaSchema, cputil.SyncerOnlineDDL(cfg.Name)))) c.Assert(err, IsNil) c.Assert(rows.Next(), IsTrue) var count int - err = rows.Scan(&count) - c.Assert(err, IsNil) + c.Assert(rows.Scan(&count), IsNil) c.Assert(count, Equals, 2) c.Assert(rows.Next(), IsFalse) c.Assert(rows.Err(), IsNil) + rows.Close() + + // verify the column data of checkpoint not updated. + rows, err = dbConn.QuerySQL(tctx, fmt.Sprintf(`SELECT binlog_gtid FROM %s`, dbutil.TableName(cfg.MetaSchema, cputil.SyncerCheckpoint(cfg.Name)))) + c.Assert(err, IsNil) + for rows.Next() { + var gs sql.NullString + c.Assert(rows.Scan(&gs), IsNil) + c.Assert(gs.Valid, IsFalse) + } + c.Assert(rows.Err(), IsNil) + rows.Close() // update schema with GTID enabled. cfg.EnableGTID = true - c.Assert(UpdateSchema(tctx, db, cfg), ErrorMatches, ".*Not Implemented.*") + c.Assert(UpdateSchema(tctx, t.db, cfg), IsNil) + + // verify the column data of global checkpoint already updated. + rows, err = dbConn.QuerySQL(tctx, fmt.Sprintf(`SELECT binlog_gtid FROM %s WHERE is_global=1`, dbutil.TableName(cfg.MetaSchema, cputil.SyncerCheckpoint(cfg.Name)))) + c.Assert(err, IsNil) + c.Assert(rows.Next(), IsTrue) + var gs sql.NullString + c.Assert(rows.Scan(&gs), IsNil) + c.Assert(gs.String, Equals, endGS.String()) + c.Assert(rows.Next(), IsFalse) + c.Assert(rows.Err(), IsNil) + rows.Close() + + rows, err = dbConn.QuerySQL(tctx, fmt.Sprintf(`SELECT binlog_gtid FROM %s WHERE is_global!=1`, dbutil.TableName(cfg.MetaSchema, cputil.SyncerCheckpoint(cfg.Name)))) + c.Assert(err, IsNil) + for rows.Next() { + var gs sql.NullString + c.Assert(rows.Scan(&gs), IsNil) + c.Assert(gs.Valid, IsFalse) + } + c.Assert(rows.Err(), IsNil) + rows.Close() } diff --git a/pkg/v1dbschema/v106_data_for_test/v106_syncer_checkpoint.sql b/pkg/v1dbschema/v106_data_for_test/v106_syncer_checkpoint.sql index 4dff209f13..dcb6f85350 100644 --- a/pkg/v1dbschema/v106_data_for_test/v106_syncer_checkpoint.sql +++ b/pkg/v1dbschema/v106_data_for_test/v106_syncer_checkpoint.sql @@ -1,3 +1,3 @@ INSERT INTO `dm_meta_v106_test`.`test_syncer_checkpoint` VALUES -('mysql-replica-01','','','mysql-bin|000001.000001',12641,1,'2020-07-31 18:10:40','2020-07-31 18:11:40'), -('mysql-replica-01','db_single','t1','mysql-bin|000001.000001',12610,0,'2020-07-31 18:10:40','2020-07-31 18:10:40'); +('mysql-replica-01','','','mysql-bin|000001.000001',123456,1,'2020-07-31 18:10:40','2020-07-31 18:11:40'), +('mysql-replica-01','db_single','t1','mysql-bin|000001.000001',123456,0,'2020-07-31 18:10:40','2020-07-31 18:10:40'); From 3ff92ae752a6202459ebbd836e0c1b069d41ea7e Mon Sep 17 00:00:00 2001 From: Xuecheng Zhang Date: Tue, 4 Aug 2020 20:51:37 +0800 Subject: [PATCH 07/10] *: debug CI --- pkg/binlog/reader/util.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/binlog/reader/util.go b/pkg/binlog/reader/util.go index 1a6cbb7fea..b4fd0a9cd7 100644 --- a/pkg/binlog/reader/util.go +++ b/pkg/binlog/reader/util.go @@ -36,7 +36,7 @@ func GetGTIDsForPos(ctx context.Context, r Reader, endPos gmysql.Position, parse // start to get and parse binlog event from the beginning of the file. startPos := gmysql.Position{ Name: endPos.Name, - Pos: 0, + Pos: 4, } err := r.StartSyncByPos(startPos) if err != nil { From b1d6be732bf5151838a53617bb8a7e3c4515e354 Mon Sep 17 00:00:00 2001 From: csuzhangxc Date: Wed, 5 Aug 2020 11:51:06 +0800 Subject: [PATCH 08/10] *: fix UT --- pkg/binlog/reader/util.go | 2 +- pkg/v1dbschema/schema.go | 30 +++++++++++++++++++++++++----- pkg/v1dbschema/schema_test.go | 26 ++++++++++++++------------ 3 files changed, 40 insertions(+), 18 deletions(-) diff --git a/pkg/binlog/reader/util.go b/pkg/binlog/reader/util.go index b4fd0a9cd7..1a6cbb7fea 100644 --- a/pkg/binlog/reader/util.go +++ b/pkg/binlog/reader/util.go @@ -36,7 +36,7 @@ func GetGTIDsForPos(ctx context.Context, r Reader, endPos gmysql.Position, parse // start to get and parse binlog event from the beginning of the file. startPos := gmysql.Position{ Name: endPos.Name, - Pos: 4, + Pos: 0, } err := r.StartSyncByPos(startPos) if err != nil { diff --git a/pkg/v1dbschema/schema.go b/pkg/v1dbschema/schema.go index 747fd51515..e9110190f8 100644 --- a/pkg/v1dbschema/schema.go +++ b/pkg/v1dbschema/schema.go @@ -18,11 +18,13 @@ import ( "github.com/go-sql-driver/mysql" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/parser" "github.com/pingcap/tidb-tools/pkg/dbutil" "github.com/pingcap/tidb/errno" gmysql "github.com/siddontang/go-mysql/mysql" "github.com/siddontang/go-mysql/replication" + "go.uber.org/zap" "github.com/pingcap/dm/dm/config" "github.com/pingcap/dm/pkg/binlog" @@ -95,11 +97,7 @@ func updateSyncerCheckpoint(tctx *tcontext.Context, dbConn *conn.BaseConn, taskN return err } if pos.Name != "" { - realPos, err := binlog.RealMySQLPos(pos) - if err != nil { - return err - } - gs, err = reader.GetGTIDsForPos(tctx.Ctx, tcpReader, realPos, parser2) + gs, err = getGTIDsForPos(tctx, pos, tcpReader, parser2) if err != nil { return err } @@ -166,6 +164,28 @@ func getGlobalPos(tctx *tcontext.Context, dbConn *conn.BaseConn, tableName, sour }, rows.Err() } +// getGTIDsForPos gets the GTID sets for the position. +func getGTIDsForPos(tctx *tcontext.Context, pos gmysql.Position, tcpReader reader.Reader, parser2 *parser.Parser) (gtid.Set, error) { + // NOTE: because we have multiple unit test cases updating/clearing binlog in the upstream, + // we may encounter errors when reading binlog event but cleared by another test case. + failpoint.Inject("MockGetGTIDsForPos", func(val failpoint.Value) { + str := val.(string) + gs, _ := gtid.ParserGTID(gmysql.MySQLFlavor, str) + tctx.L().Info("set gs for position", zap.String("failpoint", "MockGetGTIDsForPos"), zap.Stringer("pos", pos)) + failpoint.Return(gs, nil) + }) + + realPos, err := binlog.RealMySQLPos(pos) + if err != nil { + return nil, err + } + gs, err := reader.GetGTIDsForPos(tctx.Ctx, tcpReader, realPos, parser2) + if err != nil { + return nil, err + } + return gs, nil +} + // setGlobalGTIDs tries to set `binlog_gtid` for the global checkpoint. func setGlobalGTIDs(tctx *tcontext.Context, dbConn *conn.BaseConn, taskName, tableName, sourceID, gs string) error { queries := []string{ diff --git a/pkg/v1dbschema/schema_test.go b/pkg/v1dbschema/schema_test.go index 07551959c6..50b9f7cdf1 100644 --- a/pkg/v1dbschema/schema_test.go +++ b/pkg/v1dbschema/schema_test.go @@ -25,6 +25,7 @@ import ( "testing" . "github.com/pingcap/check" + "github.com/pingcap/failpoint" "github.com/pingcap/tidb-tools/pkg/dbutil" gmysql "github.com/siddontang/go-mysql/mysql" @@ -32,7 +33,7 @@ import ( "github.com/pingcap/dm/pkg/conn" tcontext "github.com/pingcap/dm/pkg/context" "github.com/pingcap/dm/pkg/cputil" - "github.com/pingcap/dm/pkg/utils" + "github.com/pingcap/dm/pkg/gtid" ) func TestSuite(t *testing.T) { @@ -45,7 +46,6 @@ type testSchema struct { user string password string db *conn.BaseDB - sqlDB *sql.DB } var _ = Suite(&testSchema{}) @@ -56,7 +56,6 @@ func (t *testSchema) SetUpSuite(c *C) { func (t *testSchema) TestTearDown(c *C) { t.db.Close() - t.sqlDB.Close() } func (t *testSchema) setUpDBConn(c *C) { @@ -86,10 +85,6 @@ func (t *testSchema) setUpDBConn(c *C) { var err error t.db, err = conn.DefaultDBProvider.Apply(cfg) c.Assert(err, IsNil) - - dsn := fmt.Sprintf("%s:%s@tcp(%s:%d)/?charset=utf8mb4&sql_log_bin=off", t.user, t.password, t.host, t.port) - t.sqlDB, err = sql.Open("mysql", dsn) - c.Assert(err, IsNil) } func (t *testSchema) TestSchemaV106ToV20x(c *C) { @@ -110,8 +105,17 @@ func (t *testSchema) TestSchemaV106ToV20x(c *C) { Password: t.password, }, } + + endPos = gmysql.Position{ + Name: "mysql-bin|000001.000001", + Pos: 3574, + } + endGS, _ = gtid.ParserGTID(gmysql.MySQLFlavor, "ccb992ad-a557-11ea-ba6a-0242ac140002:1-16") ) + c.Assert(failpoint.Enable("github.com/pingcap/dm/pkg/v1dbschema/MockGetGTIDsForPos", `return("ccb992ad-a557-11ea-ba6a-0242ac140002:1-16")`), IsNil) + defer failpoint.Disable("github.com/pingcap/dm/pkg/v1dbschema/MockGetGTIDsForPos") + dbConn, err := t.db.GetBaseConn(tctx.Ctx) c.Assert(err, IsNil) @@ -143,9 +147,7 @@ func (t *testSchema) TestSchemaV106ToV20x(c *C) { }) c.Assert(err, IsNil) - // update position according to the current real position. - endPos, endGS, err := utils.GetMasterStatus(t.sqlDB, gmysql.MySQLFlavor) - c.Assert(err, IsNil) + // update position. insertCpV106, err := ioutil.ReadFile(filepath.Join(v1DataDir, "v106_syncer_checkpoint.sql")) c.Assert(err, IsNil) insertCpV106s := strings.ReplaceAll(string(insertCpV106), "123456", strconv.FormatUint(uint64(endPos.Pos), 10)) @@ -172,10 +174,10 @@ func (t *testSchema) TestSchemaV106ToV20x(c *C) { c.Assert(rows.Next(), IsTrue) var count int c.Assert(rows.Scan(&count), IsNil) - c.Assert(count, Equals, 2) c.Assert(rows.Next(), IsFalse) c.Assert(rows.Err(), IsNil) rows.Close() + c.Assert(count, Equals, 2) // verify the column data of checkpoint not updated. rows, err = dbConn.QuerySQL(tctx, fmt.Sprintf(`SELECT binlog_gtid FROM %s`, dbutil.TableName(cfg.MetaSchema, cputil.SyncerCheckpoint(cfg.Name)))) @@ -198,10 +200,10 @@ func (t *testSchema) TestSchemaV106ToV20x(c *C) { c.Assert(rows.Next(), IsTrue) var gs sql.NullString c.Assert(rows.Scan(&gs), IsNil) - c.Assert(gs.String, Equals, endGS.String()) c.Assert(rows.Next(), IsFalse) c.Assert(rows.Err(), IsNil) rows.Close() + c.Assert(gs.String, Equals, endGS.String()) rows, err = dbConn.QuerySQL(tctx, fmt.Sprintf(`SELECT binlog_gtid FROM %s WHERE is_global!=1`, dbutil.TableName(cfg.MetaSchema, cputil.SyncerCheckpoint(cfg.Name)))) c.Assert(err, IsNil) From fcdeddf647789ce38e060babeaca450c28b71b55 Mon Sep 17 00:00:00 2001 From: csuzhangxc Date: Wed, 5 Aug 2020 13:03:20 +0800 Subject: [PATCH 09/10] v1dbschema: try to fix CI --- pkg/v1dbschema/schema.go | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/pkg/v1dbschema/schema.go b/pkg/v1dbschema/schema.go index e9110190f8..3e172a843c 100644 --- a/pkg/v1dbschema/schema.go +++ b/pkg/v1dbschema/schema.go @@ -15,6 +15,7 @@ package v1dbschema import ( "fmt" + "strconv" "github.com/go-sql-driver/mysql" "github.com/pingcap/errors" @@ -94,12 +95,12 @@ func updateSyncerCheckpoint(tctx *tcontext.Context, dbConn *conn.BaseConn, taskN // and this should only have side effects for in-syncing shard tables, but we can mention and warn this case in the user docs. pos, err := getGlobalPos(tctx, dbConn, tableName, sourceID) if err != nil { - return err + return terror.Annotatef(err, "get global checkpoint position for source %s", sourceID) } if pos.Name != "" { gs, err = getGTIDsForPos(tctx, pos, tcpReader, parser2) if err != nil { - return err + return terror.Annotatef(err, "get GTID sets for position %s", pos) } } } @@ -112,14 +113,14 @@ func updateSyncerCheckpoint(tctx *tcontext.Context, dbConn *conn.BaseConn, taskN } _, err := dbConn.ExecuteSQLWithIgnoreError(tctx, nil, taskName, ignoreError, queries) if err != nil { - return err + return terror.Annotatef(err, "add columns for checkpoint table") } if fillGTIDs && gs != nil { // set binlog_gtid, `gs` should valid here. err = setGlobalGTIDs(tctx, dbConn, taskName, tableName, sourceID, gs.String()) if err != nil { - return err + return terror.Annotatef(err, "set GTID sets %s for checkpoint table", gs.String()) } } return nil @@ -131,9 +132,9 @@ func updateSyncerOnlineDDLMeta(tctx *tcontext.Context, dbConn *conn.BaseConn, ta queries := []string{ fmt.Sprintf(`UPDATE %s SET id=? WHERE id=?`, tableName), // for multiple columns. } - args := []interface{}{sourceID, serverID} + args := []interface{}{sourceID, strconv.FormatUint(uint64(serverID), 10)} _, err := dbConn.ExecuteSQL(tctx, nil, taskName, queries, args) - return err + return terror.Annotatef(err, "update id column for online DDL meta table") } // getGlobalPos tries to get the global checkpoint position. From 00da495d8b4e522ef929c7e0dbf81df8cb74752e Mon Sep 17 00:00:00 2001 From: csuzhangxc Date: Wed, 5 Aug 2020 15:54:30 +0800 Subject: [PATCH 10/10] *: address comments --- pkg/binlog/reader/util.go | 3 ++- pkg/binlog/reader/util_test.go | 2 +- pkg/v1dbschema/schema.go | 3 ++- pkg/v1dbschema/schema_test.go | 2 +- 4 files changed, 6 insertions(+), 4 deletions(-) diff --git a/pkg/binlog/reader/util.go b/pkg/binlog/reader/util.go index 1a6cbb7fea..d0b8fc9a34 100644 --- a/pkg/binlog/reader/util.go +++ b/pkg/binlog/reader/util.go @@ -42,6 +42,7 @@ func GetGTIDsForPos(ctx context.Context, r Reader, endPos gmysql.Position, parse if err != nil { return nil, err } + defer r.Close() var ( flavor string @@ -129,7 +130,7 @@ func GetGTIDsForPos(ctx context.Context, r Reader, endPos gmysql.Position, parse } return latestGTIDs, nil } else if latestPos > endPos.Pos { - return nil, errors.Errorf("no GTIDs get for position %s", endPos) + return nil, errors.Errorf("invalid position %s or GTID not enabled in upstream", endPos) } } } diff --git a/pkg/binlog/reader/util_test.go b/pkg/binlog/reader/util_test.go index 777324ce34..1412bdfaa8 100644 --- a/pkg/binlog/reader/util_test.go +++ b/pkg/binlog/reader/util_test.go @@ -63,6 +63,6 @@ func (t *testTCPReaderSuite) TestGetGTIDsForPos(c *C) { Name: endPos.Name, Pos: endPos.Pos - 1, }, parser2) - c.Assert(err, ErrorMatches, ".*no GTIDs get for position.*") + c.Assert(err, ErrorMatches, ".*invalid position .* or GTID not enabled in upstream.*") c.Assert(gs, IsNil) } diff --git a/pkg/v1dbschema/schema.go b/pkg/v1dbschema/schema.go index 3e172a843c..533183ef72 100644 --- a/pkg/v1dbschema/schema.go +++ b/pkg/v1dbschema/schema.go @@ -47,6 +47,7 @@ func UpdateSchema(tctx *tcontext.Context, db *conn.BaseDB, cfg *config.SubTaskCo if err != nil { return terror.ErrFailUpdateV1DBSchema.Delegate(err) } + defer db.CloseBaseConn(dbConn) // setup SQL parser. parser2, err := utils.GetParser(db.DB, cfg.EnableANSIQuotes) @@ -91,7 +92,7 @@ func updateSyncerCheckpoint(tctx *tcontext.Context, dbConn *conn.BaseConn, taskN // NOTE: get GTID sets for all (global & tables) binlog position has many problems, at least including: // - it is a heavy work because it should read binlog events once for each position // - some binlog file for the position may have already been purge - // so we only get GTID sets for the global position now, + // so we only get GTID sets for the global position now, // and this should only have side effects for in-syncing shard tables, but we can mention and warn this case in the user docs. pos, err := getGlobalPos(tctx, dbConn, tableName, sourceID) if err != nil { diff --git a/pkg/v1dbschema/schema_test.go b/pkg/v1dbschema/schema_test.go index 50b9f7cdf1..c9e60bad7a 100644 --- a/pkg/v1dbschema/schema_test.go +++ b/pkg/v1dbschema/schema_test.go @@ -78,7 +78,7 @@ func (t *testSchema) setUpDBConn(c *C) { Port: t.port, User: t.user, Password: t.password, - Session: map[string]string{"sql_log_bin ": "off"}, // do not enable binlog to break other unit test cases. + Session: map[string]string{"sql_log_bin": "off"}, // do not enable binlog to break other unit test cases. } cfg.Adjust()