diff --git a/_utils/terror_gen/errors_release.txt b/_utils/terror_gen/errors_release.txt index 1d86195946..5b27950b8b 100644 --- a/_utils/terror_gen/errors_release.txt +++ b/_utils/terror_gen/errors_release.txt @@ -283,7 +283,8 @@ ErrSyncerUnitResolveCasualityFail,[code=36056:class=sync-unit:scope=internal:lev ErrSyncerUnitReopenStreamNotSupport,[code=36057:class=sync-unit:scope=internal:level=high],"reopen %T not supported" ErrSyncerUnitUpdateConfigInSharding,[code=36058:class=sync-unit:scope=internal:level=high],"try update config when some tables' (%v) sharding DDL not synced not supported" ErrSyncerUnitExecWithNoBlockingDDL,[code=36059:class=sync-unit:scope=internal:level=high],"process unit not waiting for sharding DDL to sync" -ErrSyncerUnitNewBWList,[code=36060:class=sync-unit:scope=internal:level=high],"new black white list" +ErrSyncerUnitGenBWList,[code=36060:class=sync-unit:scope=internal:level=high],"generate black white list" +ErrSyncerUnitHandleDDLFailed,[code=36061:class=sync-unit:scope=internal:level=high],"fail to handle ddl job for %s" ErrMasterSQLOpNilRequest,[code=38001:class=dm-master:scope=internal:level=medium],"nil request not valid" ErrMasterSQLOpNotSupport,[code=38002:class=dm-master:scope=internal:level=medium],"op %s not supported" ErrMasterSQLOpWithoutSharding,[code=38003:class=dm-master:scope=internal:level=medium],"operate request without --sharding specified not valid" diff --git a/pkg/binlog/position.go b/pkg/binlog/position.go index 45f8113ac3..0d29209e6e 100644 --- a/pkg/binlog/position.go +++ b/pkg/binlog/position.go @@ -132,3 +132,31 @@ func verifyUUIDSuffix(suffix string) bool { } return true } + +// AdjustPosition adjusts the filename with uuid suffix in mysql position +// for example: mysql-bin|000001.000002 -> mysql-bin.000002 +func AdjustPosition(pos gmysql.Position) gmysql.Position { + realPos, err := RealMySQLPos(pos) + if err != nil { + // just return the origin pos + return pos + } + + return realPos +} + +// ComparePosition returns: +// 1 if pos1 is bigger than pos2 +// 0 if pos1 is equal to pos2 +// -1 if pos1 is less than pos2 +func ComparePosition(pos1, pos2 gmysql.Position) int { + adjustedPos1 := AdjustPosition(pos1) + adjustedPos2 := AdjustPosition(pos2) + + // means both pos1 and pos2 have uuid in name, so need also compare the uuid + if adjustedPos1.Name != pos1.Name && adjustedPos2.Name != pos2.Name { + return pos1.Compare(pos2) + } + + return adjustedPos1.Compare(adjustedPos2) +} diff --git a/pkg/binlog/position_test.go b/pkg/binlog/position_test.go index 3dd747e37a..6302eb7ec1 100644 --- a/pkg/binlog/position_test.go +++ b/pkg/binlog/position_test.go @@ -230,3 +230,120 @@ func (t *testPositionSuite) TestVerifyUUIDSuffix(c *C) { c.Assert(verifyUUIDSuffix(cs.suffix), Equals, cs.valid) } } + +func (t *testPositionSuite) TestAdjustPosition(c *C) { + cases := []struct { + pos gmysql.Position + adjustedPos gmysql.Position + }{ + { + gmysql.Position{ + "mysql-bin.00001", + 123, + }, + gmysql.Position{ + "mysql-bin.00001", + 123, + }, + }, { + gmysql.Position{ + "mysql-bin|00001.00002", + 123, + }, + gmysql.Position{ + "mysql-bin.00002", + 123, + }, + }, { + gmysql.Position{ + "mysql-bin|00001.00002.00003", + 123, + }, + gmysql.Position{ + "mysql-bin|00001.00002.00003", + 123, + }, + }, + } + + for _, cs := range cases { + adjustedPos := AdjustPosition(cs.pos) + c.Assert(adjustedPos.Name, Equals, cs.adjustedPos.Name) + c.Assert(adjustedPos.Pos, Equals, cs.adjustedPos.Pos) + } +} + +func (t *testPositionSuite) TestComparePosition(c *C) { + cases := []struct { + pos1 gmysql.Position + pos2 gmysql.Position + cmp int + }{ + { + gmysql.Position{ + Name: "mysql-bin.00001", + Pos: 123, + }, + gmysql.Position{ + Name: "mysql-bin.00002", + Pos: 123, + }, + -1, + }, { + gmysql.Position{ + Name: "mysql-bin.00001", + Pos: 123, + }, + gmysql.Position{ + Name: "mysql-bin.00001", + Pos: 123, + }, + 0, + }, { + gmysql.Position{ + Name: "mysql-bin.00002", + Pos: 123, + }, + gmysql.Position{ + Name: "mysql-bin.00001", + Pos: 123, + }, + 1, + }, { + gmysql.Position{ + Name: "mysql-bin|00001.00002", + Pos: 123, + }, + gmysql.Position{ + Name: "mysql-bin|00002.00001", + Pos: 123, + }, + -1, + }, { + gmysql.Position{ + Name: "mysql-bin|00001.00002", + Pos: 123, + }, + gmysql.Position{ + Name: "mysql-bin|00001.00002", + Pos: 123, + }, + 0, + }, { + gmysql.Position{ + Name: "mysql-bin|00002.00001", + Pos: 123, + }, + gmysql.Position{ + Name: "mysql-bin|00001.00002", + Pos: 123, + }, + 1, + }, + } + + for _, cs := range cases { + cmp := ComparePosition(cs.pos1, cs.pos2) + c.Assert(cmp, Equals, cs.cmp) + } +} diff --git a/pkg/terror/error_list.go b/pkg/terror/error_list.go index c01b6ec672..f5ff82b34f 100644 --- a/pkg/terror/error_list.go +++ b/pkg/terror/error_list.go @@ -349,7 +349,8 @@ const ( codeSyncerUnitReopenStreamNotSupport codeSyncerUnitUpdateConfigInSharding codeSyncerUnitExecWithNoBlockingDDL - codeSyncerUnitNewBWList + codeSyncerUnitGenBWList + codeSyncerUnitHandleDDLFailed ) // DM-master error code @@ -796,7 +797,8 @@ var ( ErrSyncerUnitReopenStreamNotSupport = New(codeSyncerUnitReopenStreamNotSupport, ClassSyncUnit, ScopeInternal, LevelHigh, "reopen %T not supported") ErrSyncerUnitUpdateConfigInSharding = New(codeSyncerUnitUpdateConfigInSharding, ClassSyncUnit, ScopeInternal, LevelHigh, "try update config when some tables' (%v) sharding DDL not synced not supported") ErrSyncerUnitExecWithNoBlockingDDL = New(codeSyncerUnitExecWithNoBlockingDDL, ClassSyncUnit, ScopeInternal, LevelHigh, "process unit not waiting for sharding DDL to sync") - ErrSyncerUnitNewBWList = New(codeSyncerUnitNewBWList, ClassSyncUnit, ScopeInternal, LevelHigh, "new black white list") + ErrSyncerUnitGenBWList = New(codeSyncerUnitGenBWList, ClassSyncUnit, ScopeInternal, LevelHigh, "generate black white list") + ErrSyncerUnitHandleDDLFailed = New(codeSyncerUnitHandleDDLFailed, ClassSyncUnit, ScopeInternal, LevelHigh, "fail to handle ddl job for %s") // DM-master error ErrMasterSQLOpNilRequest = New(codeMasterSQLOpNilRequest, ClassDMMaster, ScopeInternal, LevelMedium, "nil request not valid") diff --git a/syncer/checkpoint.go b/syncer/checkpoint.go index e266544449..031108a650 100644 --- a/syncer/checkpoint.go +++ b/syncer/checkpoint.go @@ -164,6 +164,9 @@ type CheckPoint interface { // corresponding to to Meta.Pos GlobalPoint() mysql.Position + // TablePoint returns all table's stream checkpoint + TablePoint() map[string]map[string]mysql.Position + // FlushedGlobalPoint returns the flushed global binlog stream's checkpoint // corresponding to to Meta.Pos FlushedGlobalPoint() mysql.Position @@ -421,6 +424,21 @@ func (cp *RemoteCheckPoint) GlobalPoint() mysql.Position { return cp.globalPoint.MySQLPos() } +// TablePoint implements CheckPoint.TablePoint +func (cp *RemoteCheckPoint) TablePoint() map[string]map[string]mysql.Position { + cp.RLock() + defer cp.RUnlock() + + tablePoint := make(map[string]map[string]mysql.Position) + for schema, tables := range cp.points { + tablePoint[schema] = make(map[string]mysql.Position) + for table, point := range tables { + tablePoint[schema][table] = point.MySQLPos() + } + } + return tablePoint +} + // FlushedGlobalPoint implements CheckPoint.FlushedGlobalPoint func (cp *RemoteCheckPoint) FlushedGlobalPoint() mysql.Position { return cp.globalPoint.FlushedMySQLPos() diff --git a/syncer/db.go b/syncer/db.go index 2385608d58..64963c606f 100644 --- a/syncer/db.go +++ b/syncer/db.go @@ -215,6 +215,14 @@ func (conn *DBConn) querySQL(tctx *tcontext.Context, query string, args ...inter } func (conn *DBConn) executeSQLWithIgnore(tctx *tcontext.Context, ignoreError func(error) bool, queries []string, args ...[]interface{}) (int, error) { + failpoint.Inject("ExecuteSQLWithIgnoreFailed", func(val failpoint.Value) { + queryPattern := val.(string) + if len(queries) == 1 && strings.Contains(queries[0], queryPattern) { + tctx.L().Warn("executeSQLWithIgnore failed", zap.String("failpoint", "ExecuteSQLWithIgnoreFailed")) + failpoint.Return(0, terror.ErrDBUnExpect.Generate("invalid connection")) + } + }) + if len(queries) == 0 { return 0, nil } diff --git a/syncer/online_ddl.go b/syncer/online_ddl.go index 8b5705592b..48333919f7 100644 --- a/syncer/online_ddl.go +++ b/syncer/online_ddl.go @@ -27,6 +27,7 @@ import ( "github.com/pingcap/parser/ast" "github.com/pingcap/tidb-tools/pkg/filter" + "go.uber.org/zap" ) var ( @@ -176,6 +177,10 @@ func (s *OnlineDDLStorage) Get(ghostSchema, ghostTable string) *GhostDDLInfo { return nil } + if mSchema == nil || mSchema[ghostTable] == nil { + return nil + } + clone := new(GhostDDLInfo) *clone = *mSchema[ghostTable] @@ -204,6 +209,10 @@ func (s *OnlineDDLStorage) Save(tctx *tcontext.Context, ghostSchema, ghostTable, // maybe we meed more checks for it + if len(info.DDLs) != 0 && info.DDLs[len(info.DDLs)-1] == ddl { + tctx.L().Warn("online ddl may be saved before, just ignore it", zap.String("ddl", ddl)) + return nil + } info.DDLs = append(info.DDLs, ddl) ddlsBytes, err := json.Marshal(mSchema[ghostTable]) if err != nil { diff --git a/syncer/syncer.go b/syncer/syncer.go index 5d70dd9209..7175b8e101 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -318,7 +318,7 @@ func (s *Syncer) Init(ctx context.Context) (err error) { s.bwList, err = filter.New(s.cfg.CaseSensitive, s.cfg.BWList) if err != nil { - return terror.ErrSyncerUnitNewBWList.Delegate(err) + return terror.ErrSyncerUnitGenBWList.Delegate(err) } s.binlogFilter, err = bf.NewBinlogEvent(s.cfg.CaseSensitive, s.cfg.FilterRules) @@ -470,7 +470,8 @@ func (s *Syncer) initShardingGroups() error { // IsFreshTask implements Unit.IsFreshTask func (s *Syncer) IsFreshTask(ctx context.Context) (bool, error) { globalPoint := s.checkpoint.GlobalPoint() - return globalPoint.Compare(minCheckpoint) <= 0, nil + tablePoint := s.checkpoint.TablePoint() + return binlog.ComparePosition(globalPoint, minCheckpoint) <= 0 && len(tablePoint) == 0, nil } func (s *Syncer) resetReplicationSyncer() { @@ -1115,9 +1116,10 @@ func (s *Syncer) Run(ctx context.Context) (err error) { s.tctx.L().Error("panic log", zap.Reflect("error message", err1), zap.Stack("statck")) err = terror.ErrSyncerUnitPanic.Generate(err1) } - // flush the jobs channels, but if error occurred, we should not flush the checkpoints - if err1 := s.flushJobs(); err1 != nil { - s.tctx.L().Error("fail to finish all jobs when binlog replication exits", log.ShortError(err1)) + + s.jobWg.Wait() + if err2 := s.flushCheckPoints(); err2 != nil { + s.tctx.L().Warn("fail to flush check points when exit task", zap.Error(err2)) } }() @@ -1722,11 +1724,19 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext) e s.tctx.L().Info("replace ddls to preset ddls by sql operator in normal mode", zap.String("event", "query"), zap.Strings("preset ddls", appliedSQLs), zap.Strings("ddls", needHandleDDLs), zap.ByteString("raw statement", ev.Query), log.WrapStringerField("position", ec.currentPos)) needHandleDDLs = appliedSQLs // maybe nil } + job := newDDLJob(nil, needHandleDDLs, *ec.lastPos, *ec.currentPos, nil, nil, *ec.traceID) err = s.addJobFunc(job) if err != nil { return err } + + // when add ddl job, will execute ddl and then flush checkpoint. + // if execute ddl failed, the execErrorDetected will be true. + if s.execErrorDetected.Get() { + return terror.ErrSyncerUnitHandleDDLFailed.Generate(ev.Query) + } + s.tctx.L().Info("finish to handle ddls in normal mode", zap.String("event", "query"), zap.Strings("ddls", needHandleDDLs), zap.ByteString("raw statement", ev.Query), log.WrapStringerField("position", ec.currentPos)) for _, tbl := range targetTbls { @@ -1909,6 +1919,10 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext) e return err } + if s.execErrorDetected.Get() { + return terror.ErrSyncerUnitHandleDDLFailed.Generate(ev.Query) + } + if len(onlineDDLTableNames) > 0 { err = s.clearOnlineDDL(ec.tctx, ddlInfo.tableNames[1][0].Schema, ddlInfo.tableNames[1][0].Name) if err != nil { @@ -2347,7 +2361,7 @@ func (s *Syncer) Update(cfg *config.SubTaskConfig) error { oldBwList = s.bwList s.bwList, err = filter.New(cfg.CaseSensitive, cfg.BWList) if err != nil { - return terror.ErrSyncerUnitNewBWList.Delegate(err) + return terror.ErrSyncerUnitGenBWList.Delegate(err) } // update route diff --git a/syncer/syncer_test.go b/syncer/syncer_test.go index ce3ae57a2a..0da9ebc1ff 100644 --- a/syncer/syncer_test.go +++ b/syncer/syncer_test.go @@ -1502,10 +1502,6 @@ func (s *testSyncerSuite) TestRun(c *C) { update, "REPLACE INTO `test_1`.`t_1` (`id`,`name`) VALUES (?,?);", []interface{}{int64(580981944116838401), "b"}, - }, { - flush, - "", - nil, }, } @@ -1566,10 +1562,6 @@ func (s *testSyncerSuite) TestRun(c *C) { del, "DELETE FROM `test_1`.`t_2` WHERE `id` = ? LIMIT 1;", []interface{}{int32(3)}, - }, { - flush, - "", - nil, }, } diff --git a/tests/online_ddl/conf/diff_config.toml b/tests/online_ddl/conf/diff_config.toml index 293fca2678..fddb4e18fa 100644 --- a/tests/online_ddl/conf/diff_config.toml +++ b/tests/online_ddl/conf/diff_config.toml @@ -23,7 +23,7 @@ tables = ["t_target"] schema = "online_ddl" table = "t_target" ignore-columns = ["id"] -is-online_ddl = true +is-sharding = true index-field = "uid" [[table-config.source-tables]] diff --git a/tests/online_ddl/conf/dm-task.yaml b/tests/online_ddl/conf/dm-task.yaml index 4957af3ebb..3889d18256 100644 --- a/tests/online_ddl/conf/dm-task.yaml +++ b/tests/online_ddl/conf/dm-task.yaml @@ -3,7 +3,7 @@ name: test task-mode: all is-sharding: true meta-schema: "dm_meta" -remove-meta: true +remove-meta: false disable-heartbeat: true timezone: "Asia/Shanghai" online-ddl-scheme: online-ddl-scheme-placeholder