diff --git a/_utils/terror_gen/errors_release.txt b/_utils/terror_gen/errors_release.txt index c7d8754601..c385ba9da5 100644 --- a/_utils/terror_gen/errors_release.txt +++ b/_utils/terror_gen/errors_release.txt @@ -287,6 +287,7 @@ ErrSyncerUnitReopenStreamNotSupport,[code=36057:class=sync-unit:scope=internal:l ErrSyncerUnitUpdateConfigInSharding,[code=36058:class=sync-unit:scope=internal:level=high],"try update config when some tables' (%v) sharding DDL not synced not supported" ErrSyncerUnitExecWithNoBlockingDDL,[code=36059:class=sync-unit:scope=internal:level=high],"process unit not waiting for sharding DDL to sync" ErrSyncerUnitGenBWList,[code=36060:class=sync-unit:scope=internal:level=high],"generate black white list" +ErrSyncerUnitHandleDDLFailed,[code=36061:class=sync-unit:scope=internal:level=high],"fail to handle ddl job for %s" ErrMasterSQLOpNilRequest,[code=38001:class=dm-master:scope=internal:level=medium],"nil request not valid" ErrMasterSQLOpNotSupport,[code=38002:class=dm-master:scope=internal:level=medium],"op %s not supported" ErrMasterSQLOpWithoutSharding,[code=38003:class=dm-master:scope=internal:level=medium],"operate request without --sharding specified not valid" diff --git a/pkg/terror/error_list.go b/pkg/terror/error_list.go index 77a3f94769..4381fee0b9 100644 --- a/pkg/terror/error_list.go +++ b/pkg/terror/error_list.go @@ -357,6 +357,7 @@ const ( codeSyncerUnitUpdateConfigInSharding codeSyncerUnitExecWithNoBlockingDDL codeSyncerUnitGenBWList + codeSyncerUnitHandleDDLFailed ) // DM-master error code @@ -828,6 +829,7 @@ var ( ErrSyncerUnitUpdateConfigInSharding = New(codeSyncerUnitUpdateConfigInSharding, ClassSyncUnit, ScopeInternal, LevelHigh, "try update config when some tables' (%v) sharding DDL not synced not supported") ErrSyncerUnitExecWithNoBlockingDDL = New(codeSyncerUnitExecWithNoBlockingDDL, ClassSyncUnit, ScopeInternal, LevelHigh, "process unit not waiting for sharding DDL to sync") ErrSyncerUnitGenBWList = New(codeSyncerUnitGenBWList, ClassSyncUnit, ScopeInternal, LevelHigh, "generate black white list") + ErrSyncerUnitHandleDDLFailed = New(codeSyncerUnitHandleDDLFailed, ClassSyncUnit, ScopeInternal, LevelHigh, "fail to handle ddl job for %s") // DM-master error ErrMasterSQLOpNilRequest = New(codeMasterSQLOpNilRequest, ClassDMMaster, ScopeInternal, LevelMedium, "nil request not valid") diff --git a/syncer/checkpoint.go b/syncer/checkpoint.go index e3e566d5a5..3507610fc5 100644 --- a/syncer/checkpoint.go +++ b/syncer/checkpoint.go @@ -189,6 +189,9 @@ type CheckPoint interface { // corresponding to to Meta.Pos GlobalPoint() mysql.Position + // TablePoint returns all table's stream checkpoint + TablePoint() map[string]map[string]mysql.Position + // FlushedGlobalPoint returns the flushed global binlog stream's checkpoint // corresponding to to Meta.Pos FlushedGlobalPoint() mysql.Position @@ -473,6 +476,21 @@ func (cp *RemoteCheckPoint) GlobalPoint() mysql.Position { return cp.globalPoint.MySQLPos() } +// TablePoint implements CheckPoint.TablePoint +func (cp *RemoteCheckPoint) TablePoint() map[string]map[string]mysql.Position { + cp.RLock() + defer cp.RUnlock() + + tablePoint := make(map[string]map[string]mysql.Position) + for schema, tables := range cp.points { + tablePoint[schema] = make(map[string]mysql.Position) + for table, point := range tables { + tablePoint[schema][table] = point.MySQLPos() + } + } + return tablePoint +} + // FlushedGlobalPoint implements CheckPoint.FlushedGlobalPoint func (cp *RemoteCheckPoint) FlushedGlobalPoint() mysql.Position { return cp.globalPoint.FlushedMySQLPos() diff --git a/syncer/db.go b/syncer/db.go index 5108097031..72f9cbd55b 100644 --- a/syncer/db.go +++ b/syncer/db.go @@ -15,6 +15,7 @@ package syncer import ( "database/sql" + "strings" "time" "github.com/pingcap/dm/dm/config" @@ -192,6 +193,14 @@ func (conn *DBConn) querySQL(tctx *tcontext.Context, query string, args ...inter } func (conn *DBConn) executeSQLWithIgnore(tctx *tcontext.Context, ignoreError func(error) bool, queries []string, args ...[]interface{}) (int, error) { + failpoint.Inject("ExecuteSQLWithIgnoreFailed", func(val failpoint.Value) { + queryPattern := val.(string) + if len(queries) == 1 && strings.Contains(queries[0], queryPattern) { + tctx.L().Warn("executeSQLWithIgnore failed", zap.String("failpoint", "ExecuteSQLWithIgnoreFailed")) + failpoint.Return(0, terror.ErrDBUnExpect.Generate("invalid connection")) + } + }) + if len(queries) == 0 { return 0, nil } diff --git a/syncer/online_ddl.go b/syncer/online_ddl.go index 8b5705592b..48333919f7 100644 --- a/syncer/online_ddl.go +++ b/syncer/online_ddl.go @@ -27,6 +27,7 @@ import ( "github.com/pingcap/parser/ast" "github.com/pingcap/tidb-tools/pkg/filter" + "go.uber.org/zap" ) var ( @@ -176,6 +177,10 @@ func (s *OnlineDDLStorage) Get(ghostSchema, ghostTable string) *GhostDDLInfo { return nil } + if mSchema == nil || mSchema[ghostTable] == nil { + return nil + } + clone := new(GhostDDLInfo) *clone = *mSchema[ghostTable] @@ -204,6 +209,10 @@ func (s *OnlineDDLStorage) Save(tctx *tcontext.Context, ghostSchema, ghostTable, // maybe we meed more checks for it + if len(info.DDLs) != 0 && info.DDLs[len(info.DDLs)-1] == ddl { + tctx.L().Warn("online ddl may be saved before, just ignore it", zap.String("ddl", ddl)) + return nil + } info.DDLs = append(info.DDLs, ddl) ddlsBytes, err := json.Marshal(mSchema[ghostTable]) if err != nil { diff --git a/syncer/syncer.go b/syncer/syncer.go index 75576de3f6..f794c9b896 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -420,7 +420,8 @@ func (s *Syncer) initShardingGroups() error { // IsFreshTask implements Unit.IsFreshTask func (s *Syncer) IsFreshTask(ctx context.Context) (bool, error) { globalPoint := s.checkpoint.GlobalPoint() - return binlog.ComparePosition(globalPoint, minCheckpoint) <= 0, nil + tablePoint := s.checkpoint.TablePoint() + return binlog.ComparePosition(globalPoint, minCheckpoint) <= 0 && len(tablePoint) == 0, nil } func (s *Syncer) reset() { @@ -1054,9 +1055,10 @@ func (s *Syncer) Run(ctx context.Context) (err error) { s.tctx.L().Error("panic log", zap.Reflect("error message", err1), zap.Stack("statck")) err = terror.ErrSyncerUnitPanic.Generate(err1) } - // flush the jobs channels, but if error occurred, we should not flush the checkpoints - if err1 := s.flushJobs(); err1 != nil { - s.tctx.L().Error("fail to finish all jobs when binlog replication exits", log.ShortError(err1)) + + s.jobWg.Wait() + if err2 := s.flushCheckPoints(); err2 != nil { + s.tctx.L().Warn("fail to flush check points when exit task", zap.Error(err2)) } }() @@ -1675,11 +1677,19 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext) e s.tctx.L().Info("replace ddls to preset ddls by sql operator in normal mode", zap.String("event", "query"), zap.Strings("preset ddls", appliedSQLs), zap.Strings("ddls", needHandleDDLs), zap.ByteString("raw statement", ev.Query), log.WrapStringerField("position", ec.currentPos)) needHandleDDLs = appliedSQLs // maybe nil } + job := newDDLJob(nil, needHandleDDLs, *ec.lastPos, *ec.currentPos, nil, nil, *ec.traceID) err = s.addJobFunc(job) if err != nil { return err } + + // when add ddl job, will execute ddl and then flush checkpoint. + // if execute ddl failed, the execErrorDetected will be true. + if s.execErrorDetected.Get() { + return terror.ErrSyncerUnitHandleDDLFailed.Generate(ev.Query) + } + s.tctx.L().Info("finish to handle ddls in normal mode", zap.String("event", "query"), zap.Strings("ddls", needHandleDDLs), zap.ByteString("raw statement", ev.Query), log.WrapStringerField("position", ec.currentPos)) for _, td := range needTrackDDLs { @@ -1872,6 +1882,10 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext) e return err } + if s.execErrorDetected.Get() { + return terror.ErrSyncerUnitHandleDDLFailed.Generate(ev.Query) + } + if len(onlineDDLTableNames) > 0 { err = s.clearOnlineDDL(ec.tctx, ddlInfo.tableNames[1][0].Schema, ddlInfo.tableNames[1][0].Name) if err != nil { diff --git a/syncer/syncer_test.go b/syncer/syncer_test.go index 95adc8f9fb..d0b3f0c13c 100644 --- a/syncer/syncer_test.go +++ b/syncer/syncer_test.go @@ -1498,10 +1498,6 @@ func (s *testSyncerSuite) TestRun(c *C) { update, "REPLACE INTO `test_1`.`t_1` (`id`,`name`) VALUES (?,?);", []interface{}{int64(580981944116838401), "b"}, - }, { - flush, - "", - nil, }, } @@ -1561,10 +1557,6 @@ func (s *testSyncerSuite) TestRun(c *C) { del, "DELETE FROM `test_1`.`t_2` WHERE `id` = ? LIMIT 1;", []interface{}{int32(3)}, - }, { - flush, - "", - nil, }, } diff --git a/tests/online_ddl/conf/diff_config.toml b/tests/online_ddl/conf/diff_config.toml index 293fca2678..fddb4e18fa 100644 --- a/tests/online_ddl/conf/diff_config.toml +++ b/tests/online_ddl/conf/diff_config.toml @@ -23,7 +23,7 @@ tables = ["t_target"] schema = "online_ddl" table = "t_target" ignore-columns = ["id"] -is-online_ddl = true +is-sharding = true index-field = "uid" [[table-config.source-tables]] diff --git a/tests/online_ddl/conf/dm-task.yaml b/tests/online_ddl/conf/dm-task.yaml index 4957af3ebb..3889d18256 100644 --- a/tests/online_ddl/conf/dm-task.yaml +++ b/tests/online_ddl/conf/dm-task.yaml @@ -3,7 +3,7 @@ name: test task-mode: all is-sharding: true meta-schema: "dm_meta" -remove-meta: true +remove-meta: false disable-heartbeat: true timezone: "Asia/Shanghai" online-ddl-scheme: online-ddl-scheme-placeholder