From 10e3e04c8e9da29420b34c2bdb972fc1cae2051c Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Tue, 21 Apr 2020 15:26:45 +0800 Subject: [PATCH 1/9] initial cherry-pick --- syncer/job.go | 30 +++++++++++++++++++-------- syncer/job_test.go | 2 +- syncer/syncer.go | 48 +++++++++++++++++++++++++++++-------------- syncer/syncer_test.go | 17 +++++++++++++++ syncer/util.go | 25 ++++++++++++++++++++++ syncer/util_test.go | 24 ++++++++++++++++++++++ 6 files changed, 122 insertions(+), 24 deletions(-) diff --git a/syncer/job.go b/syncer/job.go index c67cec147b..3285196dce 100644 --- a/syncer/job.go +++ b/syncer/job.go @@ -59,9 +59,11 @@ func (t opType) String() string { } type job struct { - tp opType - sourceSchema string - sourceTable string + tp opType + // ddl in ShardOptimistic and ShardPessimistic will only affect one table at one time but for normal node + // we don't have this limit. So we should update multi tables in normal mode. + // sql example: drop table `s1`.`t1`, `s2`.`t2`. + sourceTbl map[string][]string targetSchema string targetTable string sql string @@ -89,8 +91,7 @@ func newJob(tp opType, sourceSchema, sourceTable, targetSchema, targetTable, sql } return &job{ tp: tp, - sourceSchema: sourceSchema, - sourceTable: sourceTable, + sourceTbl: map[string][]string{sourceSchema: {sourceTable}}, targetSchema: targetSchema, targetTable: targetTable, sql: sql, @@ -104,7 +105,10 @@ func newJob(tp opType, sourceSchema, sourceTable, targetSchema, targetTable, sql } } -func newDDLJob(ddlInfo *shardingDDLInfo, ddls []string, pos, cmdPos mysql.Position, currentGtidSet gtid.Set, ddlExecItem *DDLExecItem, traceID string) *job { +// newDDL job is used to create a new ddl job +// when cfg.ShardMode == "", ddlInfo == nil,sourceTbls != nil, we use sourceTbls to record ddl affected tables. +// when cfg.ShardMode == ShardOptimistic || ShardPessimistic, ddlInfo != nil, sourceTbls == nil. +func newDDLJob(ddlInfo *shardingDDLInfo, ddls []string, pos, cmdPos mysql.Position, currentGtidSet gtid.Set, ddlExecItem *DDLExecItem, traceID string, sourceTbls map[string]map[string]struct{}) *job { var gs gtid.Set if currentGtidSet != nil { gs = currentGtidSet.Clone() @@ -120,10 +124,20 @@ func newDDLJob(ddlInfo *shardingDDLInfo, ddls []string, pos, cmdPos mysql.Positi } if ddlInfo != nil { - j.sourceSchema = ddlInfo.tableNames[0][0].Schema - j.sourceTable = ddlInfo.tableNames[0][0].Name + j.sourceTbl = map[string][]string{ddlInfo.tableNames[0][0].Schema: {ddlInfo.tableNames[0][0].Name}} j.targetSchema = ddlInfo.tableNames[1][0].Schema j.targetTable = ddlInfo.tableNames[1][0].Name + } else if sourceTbls != nil { + sourceTbl := make(map[string][]string, len(sourceTbls)) + for schema, tbMap := range sourceTbls { + if len(tbMap) > 0 { + sourceTbl[schema] = make([]string, 0, len(tbMap)) + } + for name := range tbMap { + sourceTbl[schema] = append(sourceTbl[schema], name) + } + } + j.sourceTbl = sourceTbl } if ddlExecItem != nil && ddlExecItem.req != nil { diff --git a/syncer/job_test.go b/syncer/job_test.go index 9ef3af43dd..ecb96a3abf 100644 --- a/syncer/job_test.go +++ b/syncer/job_test.go @@ -97,7 +97,7 @@ func (t *testJobSuite) TestJob(c *C) { newJob(insert, "test", "t1", "test", "t1", "insert into test.t1 values(?)", []interface{}{1}, "1", mysql.Position{}, mysql.Position{}, nil, ""), "tp: insert, sql: insert into test.t1 values(?), args: [1], key: 1, ddls: [], last_pos: (, 0), current_pos: (, 0), gtid:", }, { - newDDLJob(ddlInfo, []string{"create database test"}, mysql.Position{}, mysql.Position{}, nil, ddlExecItem, ""), + newDDLJob(ddlInfo, []string{"create database test"}, mysql.Position{}, mysql.Position{}, nil, ddlExecItem, "", nil), "tp: ddl, sql: , args: [], key: , ddls: [create database test], last_pos: (, 0), current_pos: (, 0), gtid:", }, { newXIDJob(mysql.Position{}, mysql.Position{}, nil, ""), diff --git a/syncer/syncer.go b/syncer/syncer.go index ad0dad86e6..74e3112d6b 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -801,17 +801,31 @@ func (s *Syncer) addJob(job *job) error { switch job.tp { case ddl: + failpoint.Inject("ExitAfterDDLBeforeFlush", func() { + s.tctx.L().Warn("exit triggered", zap.String("failpoint", "ExitAfterDDLBeforeFlush")) + utils.OsExit(1) + }) // only save checkpoint for DDL and XID (see above) s.saveGlobalPoint(job.pos) - if len(job.sourceSchema) > 0 { - s.checkpoint.SaveTablePoint(job.sourceSchema, job.sourceTable, job.pos) + for sourceSchema, tbs := range job.sourceTbl { + if len(sourceSchema) == 0 { + continue + } + for _, sourceTable := range tbs { + s.checkpoint.SaveTablePoint(sourceSchema, sourceTable, job.pos) + } } // reset sharding group after checkpoint saved s.resetShardingGroup(job.targetSchema, job.targetTable) case insert, update, del: // save job's current pos for DML events - if len(job.sourceSchema) > 0 { - s.checkpoint.SaveTablePoint(job.sourceSchema, job.sourceTable, job.currentPos) + for sourceSchema, tbs := range job.sourceTbl { + if len(sourceSchema) == 0 { + continue + } + for _, sourceTable := range tbs { + s.checkpoint.SaveTablePoint(sourceSchema, sourceTable, job.currentPos) + } } } @@ -1653,7 +1667,7 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext) e var ( ddlInfo *shardingDDLInfo needHandleDDLs []string - targetTbls = make(map[string]*filter.Table) + sourceTbls = make(map[string]map[string]struct{}) // db name -> tb name ) for _, sql := range sqls { sqlDDL, tableNames, stmt, handleErr := s.handleDDL(ec.parser2, usedSchema, sql) @@ -1712,7 +1726,9 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext) e } needHandleDDLs = append(needHandleDDLs, sqlDDL) - targetTbls[tableNames[1][0].String()] = tableNames[1][0] + // TODO: current table checkpoints will be deleted in track ddls, but created and updated in flush checkpoints, + // we should use a better mechanism to combine these operations + recordSourceTbls(sourceTbls, stmt, tableNames[0][0]) } s.tctx.L().Info("prepare to handle ddls", zap.String("event", "query"), zap.Strings("ddls", needHandleDDLs), zap.ByteString("raw statement", ev.Query), log.WrapStringerField("position", ec.currentPos)) @@ -1741,7 +1757,11 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext) e needHandleDDLs = appliedSQLs // maybe nil } - job := newDDLJob(nil, needHandleDDLs, *ec.lastPos, *ec.currentPos, nil, nil, *ec.traceID) + if err := s.flushJobs(); err != nil { + return err + } + + job := newDDLJob(nil, needHandleDDLs, *ec.lastPos, *ec.currentPos, nil, nil, *ec.traceID, sourceTbls) err = s.addJobFunc(job) if err != nil { return err @@ -1755,12 +1775,6 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext) e s.tctx.L().Info("finish to handle ddls in normal mode", zap.String("event", "query"), zap.Strings("ddls", needHandleDDLs), zap.ByteString("raw statement", ev.Query), log.WrapStringerField("position", ec.currentPos)) - for _, tbl := range targetTbls { - s.clearTables(tbl.Schema, tbl.Name) - // save checkpoint of each table - s.checkpoint.SaveTablePoint(tbl.Schema, tbl.Name, *ec.currentPos) - } - for _, table := range onlineDDLTableNames { s.tctx.L().Info("finish online ddl and clear online ddl metadata in normal mode", zap.String("event", "query"), zap.Strings("ddls", needHandleDDLs), zap.ByteString("raw statement", ev.Query), zap.String("schema", table.Schema), zap.String("table", table.Name)) err = s.onlineDDL.Finish(ec.tctx, table.Schema, table.Name) @@ -1816,6 +1830,10 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext) e s.tctx.L().Info(annotate, zap.String("event", "query"), zap.String("source", source), zap.Strings("ddls", needHandleDDLs), zap.ByteString("raw statement", ev.Query), zap.Bool("in-sharding", needShardingHandle), zap.Stringer("start position", startPos), zap.Bool("is-synced", synced), zap.Int("unsynced", remain)) + if err := s.flushJobs(); err != nil { + return err + } + if needShardingHandle { target, _ := GenTableID(ddlInfo.tableNames[1][0].Schema, ddlInfo.tableNames[1][0].Name) unsyncedTableGauge.WithLabelValues(s.cfg.Name, target).Set(float64(remain)) @@ -1861,7 +1879,7 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext) e } // Don't send new DDLInfo to dm-master until all local sql jobs finished - s.jobWg.Wait() + // since jobWg is flushed by flushJobs before, we don't wait here any more // NOTE: if we need singleton Syncer (without dm-master) to support sharding DDL sync // we should add another config item to differ, and do not save DDLInfo, and not wait for ddlExecInfo @@ -1929,7 +1947,7 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext) e s.tctx.L().Info("replace ddls to preset ddls by sql operator in shard mode", zap.String("event", "query"), zap.Strings("preset ddls", appliedSQLs), zap.Strings("ddls", needHandleDDLs), zap.ByteString("raw statement", ev.Query), zap.Stringer("start position", startPos), log.WrapStringerField("end position", ec.currentPos)) needHandleDDLs = appliedSQLs // maybe nil } - job := newDDLJob(ddlInfo, needHandleDDLs, *ec.lastPos, *ec.currentPos, nil, ddlExecItem, *ec.traceID) + job := newDDLJob(ddlInfo, needHandleDDLs, *ec.lastPos, *ec.currentPos, nil, ddlExecItem, *ec.traceID, nil) err = s.addJobFunc(job) if err != nil { return err diff --git a/syncer/syncer_test.go b/syncer/syncer_test.go index 71db9cda10..095b07629b 100644 --- a/syncer/syncer_test.go +++ b/syncer/syncer_test.go @@ -1462,14 +1462,27 @@ func (s *testSyncerSuite) TestRun(c *C) { go syncer.Process(ctx, resultCh) expectJobs1 := []*expectJob{ + // now every ddl job will start with a flush job { + flush, + "", + nil, + }, { ddl, "CREATE DATABASE IF NOT EXISTS `test_1`", nil, + }, { + flush, + "", + nil, }, { ddl, "CREATE TABLE IF NOT EXISTS `test_1`.`t_1` (`id` INT PRIMARY KEY,`name` VARCHAR(24))", nil, + }, { + flush, + "", + nil, }, { ddl, "CREATE TABLE IF NOT EXISTS `test_1`.`t_2` (`id` INT PRIMARY KEY,`name` VARCHAR(24))", @@ -1478,6 +1491,10 @@ func (s *testSyncerSuite) TestRun(c *C) { insert, "REPLACE INTO `test_1`.`t_1` (`id`,`name`) VALUES (?,?)", []interface{}{int64(580981944116838401), "a"}, + }, { + flush, + "", + nil, }, { ddl, "ALTER TABLE `test_1`.`t_1` ADD INDEX `index1`(`name`)", diff --git a/syncer/util.go b/syncer/util.go index 11682dd517..ee7c88727b 100644 --- a/syncer/util.go +++ b/syncer/util.go @@ -20,6 +20,7 @@ import ( "strings" "github.com/pingcap/parser/ast" + "github.com/pingcap/tidb-tools/pkg/filter" "github.com/pingcap/dm/dm/config" "github.com/pingcap/dm/pkg/terror" @@ -95,3 +96,27 @@ func getDBConfigFromEnv() config.DBConfig { Port: port, } } + +// record source tbls record the tables that need to flush checkpoints +func recordSourceTbls(sourceTbls map[string]map[string]struct{}, stmt ast.StmtNode, table *filter.Table) { + schema, name := table.Schema, table.Name + switch stmt.(type) { + // these ddls' relative table checkpoints will be deleted during track ddl, + // so we shouldn't flush these checkpoints + case *ast.DropDatabaseStmt: + delete(sourceTbls, schema) + case *ast.DropTableStmt: + if _, ok := sourceTbls[schema]; ok { + delete(sourceTbls[schema], name) + } + // these ddls won't update schema tracker, no need to update them + case *ast.LockTablesStmt, *ast.UnlockTablesStmt, *ast.CleanupTableLockStmt, *ast.TruncateTableStmt: + break + // flush other tables schema tracker info into checkpoint + default: + if _, ok := sourceTbls[schema]; !ok { + sourceTbls[schema] = make(map[string]struct{}) + } + sourceTbls[schema][name] = struct{}{} + } +} diff --git a/syncer/util_test.go b/syncer/util_test.go index 175f444fa6..5487a8652a 100644 --- a/syncer/util_test.go +++ b/syncer/util_test.go @@ -18,6 +18,7 @@ import ( "github.com/pingcap/parser" "github.com/pingcap/parser/ast" "github.com/pingcap/parser/model" + "github.com/pingcap/tidb-tools/pkg/filter" _ "github.com/pingcap/tidb/types/parser_driver" ) @@ -123,3 +124,26 @@ func (t *testUtilSuite) TestTableNameResultSet(c *C) { c.Assert(schema, Equals, "test") c.Assert(table, Equals, "t1") } + +func (t *testUtilSuite) TestRecordSourceTbls(c *C) { + sourceTbls := make(map[string]map[string]struct{}) + + recordSourceTbls(sourceTbls, &ast.CreateDatabaseStmt{}, &filter.Table{Schema: "a", Name: ""}) + c.Assert(sourceTbls, HasKey, "a") + c.Assert(sourceTbls["a"], HasKey, "") + + recordSourceTbls(sourceTbls, &ast.CreateTableStmt{}, &filter.Table{Schema: "a", Name: "b"}) + c.Assert(sourceTbls, HasKey, "a") + c.Assert(sourceTbls["a"], HasKey, "b") + + recordSourceTbls(sourceTbls, &ast.DropTableStmt{}, &filter.Table{Schema: "a", Name: "b"}) + _, ok := sourceTbls["a"]["b"] + c.Assert(ok, IsFalse) + + recordSourceTbls(sourceTbls, &ast.CreateTableStmt{}, &filter.Table{Schema: "a", Name: "c"}) + c.Assert(sourceTbls, HasKey, "a") + c.Assert(sourceTbls["a"], HasKey, "c") + + recordSourceTbls(sourceTbls, &ast.DropDatabaseStmt{}, &filter.Table{Schema: "a", Name: ""}) + c.Assert(sourceTbls, HasLen, 0) +} From 39ec9b2ae7cfda8374a66ad524da80f6214810e7 Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Tue, 21 Apr 2020 15:58:45 +0800 Subject: [PATCH 2/9] add integration_test --- .../sequence_sharding/data/db1.increment.sql | 3 ++ .../sequence_sharding/data/db1.increment2.sql | 4 +++ .../sequence_sharding/data/db2.increment.sql | 4 +++ .../sequence_sharding/data/db2.increment2.sql | 5 +++ tests/sequence_sharding/run.sh | 32 +++++++++++++++++++ 5 files changed, 48 insertions(+) create mode 100644 tests/sequence_sharding/data/db1.increment2.sql create mode 100644 tests/sequence_sharding/data/db2.increment2.sql diff --git a/tests/sequence_sharding/data/db1.increment.sql b/tests/sequence_sharding/data/db1.increment.sql index 801c992577..6e1f66d819 100644 --- a/tests/sequence_sharding/data/db1.increment.sql +++ b/tests/sequence_sharding/data/db1.increment.sql @@ -20,3 +20,6 @@ update t2 set d = 200; update t1 set c = 101; update t2 set c = 102; insert into t1 (uid,name,c) values(100004,'VALUES',191472878),(100005,'jAPlnzXli',1091218279); + +alter table t1 add column f int; +alter table t2 add column f int; \ No newline at end of file diff --git a/tests/sequence_sharding/data/db1.increment2.sql b/tests/sequence_sharding/data/db1.increment2.sql new file mode 100644 index 0000000000..788f39dc93 --- /dev/null +++ b/tests/sequence_sharding/data/db1.increment2.sql @@ -0,0 +1,4 @@ +use sharding_seq; +insert into t1 values(15, "i", 15, 15, 15, 15); +alter table t1 drop column f; +alter table t2 drop column f; \ No newline at end of file diff --git a/tests/sequence_sharding/data/db2.increment.sql b/tests/sequence_sharding/data/db2.increment.sql index 6bd92ca190..fc9b4dc7c1 100644 --- a/tests/sequence_sharding/data/db2.increment.sql +++ b/tests/sequence_sharding/data/db2.increment.sql @@ -25,3 +25,7 @@ update t4 set d = 200; update t4 set uid=uid+100000; insert into t2 (uid,name,c) values(300003,'nvWgBf',73),(300004,'nD1000',4029); insert into t3 (uid,name,c) values(400004,'1000',1000); + +alter table t2 add column f int; +alter table t3 add column f int; +alter table t4 add column f int; diff --git a/tests/sequence_sharding/data/db2.increment2.sql b/tests/sequence_sharding/data/db2.increment2.sql new file mode 100644 index 0000000000..d7c624f3cf --- /dev/null +++ b/tests/sequence_sharding/data/db2.increment2.sql @@ -0,0 +1,5 @@ +use sharding_seq; +insert into t2 values(16, "j", 16, 16, 16, 16); +alter table t2 drop column f; +alter table t3 drop column f; +alter table t4 drop column f; \ No newline at end of file diff --git a/tests/sequence_sharding/run.sh b/tests/sequence_sharding/run.sh index 8124319480..9abaf59466 100755 --- a/tests/sequence_sharding/run.sh +++ b/tests/sequence_sharding/run.sh @@ -28,6 +28,38 @@ function run() { # use sync_diff_inspector to check data now! check_sync_diff $WORK_DIR $cur/conf/diff_config.toml + + kill_dm_worker + check_port_offline $WORKER1_PORT 20 + check_port_offline $WORKER2_PORT 20 + + export GO_FAILPOINTS='github.com/pingcap/dm/syncer/ExitAfterDDLBeforeFlush=return(true)' + run_sql_file $cur/data/db1.increment2.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 + run_sql_file $cur/data/db2.increment2.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2 + + run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT + run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT + + check_port_offline $WORKER1_PORT 20 + check_port_offline $WORKER2_PORT 20 + + export GO_FAILPOINTS='' + + run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT + run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT + + # use sync_diff_inspector to check data now! + echo "check sync diff for the increment replication" + check_sync_diff $WORK_DIR $cur/conf/diff_config.toml + + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status test" \ + "\"stage\": \"Running\"" 4 \ + "\"unit\": \"Sync\"" 2 } cleanup_data sharding_target2 From aab400086b1bf366ec77ef701ec7908a2cd68e4f Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Tue, 21 Apr 2020 19:01:37 +0800 Subject: [PATCH 3/9] fix bug --- dm/master/status_test.go | 1 + syncer/syncer.go | 6 ++++++ 2 files changed, 7 insertions(+) diff --git a/dm/master/status_test.go b/dm/master/status_test.go index b3b5300b78..56fbc7a273 100644 --- a/dm/master/status_test.go +++ b/dm/master/status_test.go @@ -42,6 +42,7 @@ func (t *testHTTPServer) startServer(c *check.C) { c.Assert(err, check.IsNil) }() + time.Sleep(time.Second) // wait server fully started err := t.waitUntilServerOnline() c.Assert(err, check.IsNil) } diff --git a/syncer/syncer.go b/syncer/syncer.go index 74e3112d6b..700d01a388 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -1668,6 +1668,7 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext) e ddlInfo *shardingDDLInfo needHandleDDLs []string sourceTbls = make(map[string]map[string]struct{}) // db name -> tb name + targetTbls = make(map[string]*filter.Table) ) for _, sql := range sqls { sqlDDL, tableNames, stmt, handleErr := s.handleDDL(ec.parser2, usedSchema, sql) @@ -1729,6 +1730,7 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext) e // TODO: current table checkpoints will be deleted in track ddls, but created and updated in flush checkpoints, // we should use a better mechanism to combine these operations recordSourceTbls(sourceTbls, stmt, tableNames[0][0]) + targetTbls[tableNames[1][0].String()] = tableNames[1][0] } s.tctx.L().Info("prepare to handle ddls", zap.String("event", "query"), zap.Strings("ddls", needHandleDDLs), zap.ByteString("raw statement", ev.Query), log.WrapStringerField("position", ec.currentPos)) @@ -1773,6 +1775,10 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext) e return terror.ErrSyncerUnitHandleDDLFailed.Generate(ev.Query) } + for _, tbl := range targetTbls { + s.clearTables(tbl.Schema, tbl.Name) + } + s.tctx.L().Info("finish to handle ddls in normal mode", zap.String("event", "query"), zap.Strings("ddls", needHandleDDLs), zap.ByteString("raw statement", ev.Query), log.WrapStringerField("position", ec.currentPos)) for _, table := range onlineDDLTableNames { From e6b0e540f2d5642529e68e27b22ec73ce34d1e88 Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Tue, 21 Apr 2020 19:40:55 +0800 Subject: [PATCH 4/9] fix sequence_sharding --- tests/sequence_sharding/data/db1.increment2.sql | 6 +++--- tests/sequence_sharding/data/db2.increment2.sql | 6 +++--- tests/sequence_sharding/run.sh | 4 +++- 3 files changed, 9 insertions(+), 7 deletions(-) diff --git a/tests/sequence_sharding/data/db1.increment2.sql b/tests/sequence_sharding/data/db1.increment2.sql index 788f39dc93..29893920c4 100644 --- a/tests/sequence_sharding/data/db1.increment2.sql +++ b/tests/sequence_sharding/data/db1.increment2.sql @@ -1,4 +1,4 @@ -use sharding_seq; -insert into t1 values(15, "i", 15, 15, 15, 15); +use `sharding_seq`; +insert into t1 (uid,name,c,d,e,f) values (15, "i", 15, 15, 15, 15); alter table t1 drop column f; -alter table t2 drop column f; \ No newline at end of file +alter table t2 drop column f; diff --git a/tests/sequence_sharding/data/db2.increment2.sql b/tests/sequence_sharding/data/db2.increment2.sql index d7c624f3cf..67581e43cc 100644 --- a/tests/sequence_sharding/data/db2.increment2.sql +++ b/tests/sequence_sharding/data/db2.increment2.sql @@ -1,5 +1,5 @@ -use sharding_seq; -insert into t2 values(16, "j", 16, 16, 16, 16); +use `sharding_seq`; +insert into t2 (uid,name,c,d,e,f) values (16, "j", 16, 16, 16, 16); alter table t2 drop column f; alter table t3 drop column f; -alter table t4 drop column f; \ No newline at end of file +alter table t4 drop column f; diff --git a/tests/sequence_sharding/run.sh b/tests/sequence_sharding/run.sh index 9abaf59466..6fb5982b26 100755 --- a/tests/sequence_sharding/run.sh +++ b/tests/sequence_sharding/run.sh @@ -41,6 +41,7 @@ function run() { check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT + sleep 1 check_port_offline $WORKER1_PORT 20 check_port_offline $WORKER2_PORT 20 @@ -51,13 +52,14 @@ function run() { check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT + sleep 1 # use sync_diff_inspector to check data now! echo "check sync diff for the increment replication" check_sync_diff $WORK_DIR $cur/conf/diff_config.toml run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ - "query-status test" \ + "query-status sequence_sharding" \ "\"stage\": \"Running\"" 4 \ "\"unit\": \"Sync\"" 2 } From 966635da2059faf8dd981ef24eb880f75996faa5 Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Tue, 21 Apr 2020 22:41:17 +0800 Subject: [PATCH 5/9] refactor UT --- syncer/syncer_test.go | 26 ++++++++++++++++++++++++-- 1 file changed, 24 insertions(+), 2 deletions(-) diff --git a/syncer/syncer_test.go b/syncer/syncer_test.go index 095b07629b..51991f42e1 100644 --- a/syncer/syncer_test.go +++ b/syncer/syncer_test.go @@ -218,6 +218,12 @@ func (s *testSyncerSuite) mockParser(db *sql.DB, mock sqlmock.Sqlmock) (*parser. return utils.GetParser(db, false) } +func (s *testSyncerSuite) mockCheckPointMeta(checkPointMock sqlmock.Sqlmock) { + checkPointMock.ExpectBegin() + checkPointMock.ExpectExec(fmt.Sprintf("DELETE FROM `%s`.`%s_syncer_sharding_meta", s.cfg.MetaSchema, s.cfg.Name)).WillReturnResult(sqlmock.NewResult(1, 1)) + checkPointMock.ExpectCommit() +} + func (s *testSyncerSuite) mockCheckPointCreate(checkPointMock sqlmock.Sqlmock) { checkPointMock.ExpectBegin() checkPointMock.ExpectExec(fmt.Sprintf("INSERT INTO `%s`.`%s_syncer_checkpoint`", s.cfg.MetaSchema, s.cfg.Name)).WillReturnResult(sqlmock.NewResult(1, 1)) @@ -1262,8 +1268,11 @@ func (s *testSyncerSuite) TestSharding(c *C) { AddRow("sql_mode", "ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION")) // mock checkpoint db after create db table1 table2 + s.mockCheckPointMeta(checkPointMock) s.mockCheckPointCreate(checkPointMock) + s.mockCheckPointMeta(checkPointMock) s.mockCheckPointCreate(checkPointMock) + s.mockCheckPointMeta(checkPointMock) s.mockCheckPointCreate(checkPointMock) // mock downstream db result @@ -1286,6 +1295,21 @@ func (s *testSyncerSuite) TestSharding(c *C) { "Collation", "Cardinality", "Sub_part", "Packed", "Null", "Index_type", "Comment", "Index_comment"}, ).AddRow("st", 0, "PRIMARY", 1, "id", "A", 0, null, null, null, "BTREE", "", "")) + // before first ddl, we only flush global checkpoint + checkPointMock.ExpectBegin() + checkPointMock.ExpectExec(fmt.Sprintf("INSERT INTO `%s`.`%s_syncer_checkpoint`", s.cfg.MetaSchema, s.cfg.Name)).WillReturnResult(sqlmock.NewResult(1, 1)) + checkPointMock.ExpectCommit() + // before second ddl, we flush the saved table checkpoint t1 + checkPointMock.ExpectBegin() + checkPointMock.ExpectExec(fmt.Sprintf("INSERT INTO `%s`.`%s_syncer_checkpoint`", s.cfg.MetaSchema, s.cfg.Name)).WillReturnResult(sqlmock.NewResult(1, 1)) + checkPointMock.ExpectCommit() + // after ddl is synced, we flush global checkpoint and saved table point t2 + checkPointMock.ExpectBegin() + checkPointMock.ExpectExec(fmt.Sprintf("INSERT INTO `%s`.`%s_syncer_checkpoint`", s.cfg.MetaSchema, s.cfg.Name)).WillReturnResult(sqlmock.NewResult(1, 1)) + checkPointMock.ExpectExec(fmt.Sprintf("INSERT INTO `%s`.`%s_syncer_checkpoint`", s.cfg.MetaSchema, s.cfg.Name)).WillReturnResult(sqlmock.NewResult(1, 1)) + checkPointMock.ExpectExec(fmt.Sprintf("DELETE FROM `%s`.`%s_syncer_sharding_meta", s.cfg.MetaSchema, s.cfg.Name)).WillReturnResult(sqlmock.NewResult(1, 1)) + checkPointMock.ExpectCommit() + // mock expect sql for i, expectSQL := range _case.expectSQLS { mock.ExpectBegin() @@ -1299,7 +1323,6 @@ func (s *testSyncerSuite) TestSharding(c *C) { sqlmock.NewRows([]string{"Table", "Non_unique", "Key_name", "Seq_in_index", "Column_name", "Collation", "Cardinality", "Sub_part", "Packed", "Null", "Index_type", "Comment", "Index_comment"}, ).AddRow("st", 0, "PRIMARY", 1, "id", "A", 0, null, null, null, "BTREE", "", "")) - s.mockCheckPointFlush(checkPointMock) } else { // change insert to replace because of safe mode mock.ExpectExec(expectSQL.sql).WithArgs(expectSQL.args...).WillReturnResult(sqlmock.NewResult(1, 1)) @@ -1363,7 +1386,6 @@ func (s *testSyncerSuite) TestRun(c *C) { // 3. check the generated jobs // 4. update config, add route rules, and update syncer // 5. execute somes sqls and then check jobs generated - db, mock, err := sqlmock.New() c.Assert(err, IsNil) dbConn, err := db.Conn(context.Background()) From 4212ec47f142d521b767d5c28a57b6c577b7a5bd Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Thu, 23 Apr 2020 23:40:46 +0800 Subject: [PATCH 6/9] address comment --- syncer/syncer.go | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/syncer/syncer.go b/syncer/syncer.go index 700d01a388..5c6afbb55b 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -1747,6 +1747,11 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext) e *ec.traceID = traceEvent.Base.TraceID } + // flush previous DMLs and checkpoint if needing to handle the DDL. + // NOTE: do this flush before operations on shard groups which may lead to skip a table caused by `UnresolvedTables`. + if err := s.flushJobs(); err != nil { + return err + } if !s.cfg.IsSharding { s.tctx.L().Info("start to handle ddls in normal mode", zap.String("event", "query"), zap.Strings("ddls", needHandleDDLs), zap.ByteString("raw statement", ev.Query), log.WrapStringerField("position", ec.currentPos)) // try apply SQL operator before addJob. now, one query event only has one DDL job, if updating to multi DDL jobs, refine this. @@ -1759,10 +1764,6 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext) e needHandleDDLs = appliedSQLs // maybe nil } - if err := s.flushJobs(); err != nil { - return err - } - job := newDDLJob(nil, needHandleDDLs, *ec.lastPos, *ec.currentPos, nil, nil, *ec.traceID, sourceTbls) err = s.addJobFunc(job) if err != nil { @@ -1836,10 +1837,6 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext) e s.tctx.L().Info(annotate, zap.String("event", "query"), zap.String("source", source), zap.Strings("ddls", needHandleDDLs), zap.ByteString("raw statement", ev.Query), zap.Bool("in-sharding", needShardingHandle), zap.Stringer("start position", startPos), zap.Bool("is-synced", synced), zap.Int("unsynced", remain)) - if err := s.flushJobs(); err != nil { - return err - } - if needShardingHandle { target, _ := GenTableID(ddlInfo.tableNames[1][0].Schema, ddlInfo.tableNames[1][0].Name) unsyncedTableGauge.WithLabelValues(s.cfg.Name, target).Set(float64(remain)) From bbffedf9bbb65125c340cbbe9ffc81f8e69610ec Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Fri, 24 Apr 2020 10:21:19 +0800 Subject: [PATCH 7/9] address comment --- syncer/syncer.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/syncer/syncer.go b/syncer/syncer.go index 5c6afbb55b..53d9173326 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -1727,8 +1727,6 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext) e } needHandleDDLs = append(needHandleDDLs, sqlDDL) - // TODO: current table checkpoints will be deleted in track ddls, but created and updated in flush checkpoints, - // we should use a better mechanism to combine these operations recordSourceTbls(sourceTbls, stmt, tableNames[0][0]) targetTbls[tableNames[1][0].String()] = tableNames[1][0] } From 2b2fb556bbf96ae1fafce46aa6d6dd167fa1f348 Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Sun, 26 Apr 2020 20:02:03 +0800 Subject: [PATCH 8/9] update UT --- syncer/syncer_test.go | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/syncer/syncer_test.go b/syncer/syncer_test.go index 51991f42e1..9c4c96fcea 100644 --- a/syncer/syncer_test.go +++ b/syncer/syncer_test.go @@ -1295,15 +1295,13 @@ func (s *testSyncerSuite) TestSharding(c *C) { "Collation", "Cardinality", "Sub_part", "Packed", "Null", "Index_type", "Comment", "Index_comment"}, ).AddRow("st", 0, "PRIMARY", 1, "id", "A", 0, null, null, null, "BTREE", "", "")) - // before first ddl, we only flush global checkpoint - checkPointMock.ExpectBegin() - checkPointMock.ExpectExec(fmt.Sprintf("INSERT INTO `%s`.`%s_syncer_checkpoint`", s.cfg.MetaSchema, s.cfg.Name)).WillReturnResult(sqlmock.NewResult(1, 1)) - checkPointMock.ExpectCommit() - // before second ddl, we flush the saved table checkpoint t1 + // before first ddl, we flush the saved global checkpoint and table point + s.mockCheckPointFlush(checkPointMock) + // before second ddl, we flush on the global checkpoint because the two table are in sgk checkPointMock.ExpectBegin() checkPointMock.ExpectExec(fmt.Sprintf("INSERT INTO `%s`.`%s_syncer_checkpoint`", s.cfg.MetaSchema, s.cfg.Name)).WillReturnResult(sqlmock.NewResult(1, 1)) checkPointMock.ExpectCommit() - // after ddl is synced, we flush global checkpoint and saved table point t2 + // after ddl group is synced, update two table points. global checkpoint is flushed before so we don't flush it again checkPointMock.ExpectBegin() checkPointMock.ExpectExec(fmt.Sprintf("INSERT INTO `%s`.`%s_syncer_checkpoint`", s.cfg.MetaSchema, s.cfg.Name)).WillReturnResult(sqlmock.NewResult(1, 1)) checkPointMock.ExpectExec(fmt.Sprintf("INSERT INTO `%s`.`%s_syncer_checkpoint`", s.cfg.MetaSchema, s.cfg.Name)).WillReturnResult(sqlmock.NewResult(1, 1)) @@ -1332,8 +1330,6 @@ func (s *testSyncerSuite) TestSharding(c *C) { ctx, cancel := context.WithCancel(context.Background()) resultCh := make(chan pb.ProcessResult) - s.mockCheckPointFlush(checkPointMock) - go syncer.Process(ctx, resultCh) var wg sync.WaitGroup From 010592c25f3b94793f13833df4b56231aefe0c1e Mon Sep 17 00:00:00 2001 From: Chunzhu Li Date: Sun, 26 Apr 2020 20:29:43 +0800 Subject: [PATCH 9/9] add syncer test --- syncer/syncer_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/syncer/syncer_test.go b/syncer/syncer_test.go index 9c4c96fcea..109cd37a74 100644 --- a/syncer/syncer_test.go +++ b/syncer/syncer_test.go @@ -1330,6 +1330,8 @@ func (s *testSyncerSuite) TestSharding(c *C) { ctx, cancel := context.WithCancel(context.Background()) resultCh := make(chan pb.ProcessResult) + s.mockCheckPointFlush(checkPointMock) + go syncer.Process(ctx, resultCh) var wg sync.WaitGroup