diff --git a/dm/dm/worker/task_checker.go b/dm/dm/worker/task_checker.go index 2cacdc8e9a0..9b4bfcfa6e2 100644 --- a/dm/dm/worker/task_checker.go +++ b/dm/dm/worker/task_checker.go @@ -104,33 +104,13 @@ type TaskStatusChecker interface { // NewTaskStatusChecker is a TaskStatusChecker initializer. var NewTaskStatusChecker = NewRealTaskStatusChecker -type backoffController struct { - // task name -> backoff counter - backoffs map[string]*backoff.Backoff - - // task name -> task latest paused time that checker observes - latestPausedTime map[string]time.Time - - // task name -> task latest block time, block means task paused with un-resumable error - latestBlockTime map[string]time.Time - - // task name -> the latest auto resume time - latestResumeTime map[string]time.Time - - latestRelayPausedTime time.Time - latestRelayBlockTime time.Time - latestRelayResumeTime time.Time - relayBackoff *backoff.Backoff -} - -// newBackoffController returns a new backoffController instance. -func newBackoffController() *backoffController { - return &backoffController{ - backoffs: make(map[string]*backoff.Backoff), - latestPausedTime: make(map[string]time.Time), - latestBlockTime: make(map[string]time.Time), - latestResumeTime: make(map[string]time.Time), - } +// AutoResumeInfo contains some Time and Backoff that are related to auto resume. +// This structure is exposed for DM as library. +type AutoResumeInfo struct { + Backoff *backoff.Backoff + LatestPausedTime time.Time + LatestBlockTime time.Time + LatestResumeTime time.Time } // realTaskStatusChecker is not thread-safe. @@ -145,16 +125,18 @@ type realTaskStatusChecker struct { cfg config.CheckerConfig l log.Logger w *SourceWorker - bc *backoffController + + subtaskAutoResume map[string]*AutoResumeInfo + relayAutoResume *AutoResumeInfo } // NewRealTaskStatusChecker creates a new realTaskStatusChecker instance. func NewRealTaskStatusChecker(cfg config.CheckerConfig, w *SourceWorker) TaskStatusChecker { tsc := &realTaskStatusChecker{ - cfg: cfg, - l: log.With(zap.String("component", "task checker")), - w: w, - bc: newBackoffController(), + cfg: cfg, + l: log.With(zap.String("component", "task checker")), + w: w, + subtaskAutoResume: map[string]*AutoResumeInfo{}, } tsc.closed.Store(true) return tsc @@ -254,7 +236,18 @@ func isResumableError(err *pb.ProcessError) bool { return true } -func (tsc *realTaskStatusChecker) getResumeStrategy(stStatus *pb.SubTaskStatus, duration time.Duration) ResumeStrategy { +// CheckResumeSubtask updates info and returns ResumeStrategy for a subtask. +// When ResumeDispatch and the subtask is successfully resumed at caller, +// LatestResumeTime and backoff should be updated. +// This function is exposed for DM as library. +func (i *AutoResumeInfo) CheckResumeSubtask( + stStatus *pb.SubTaskStatus, + backoffRollback time.Duration, +) (strategy ResumeStrategy) { + defer func() { + i.update(strategy, backoffRollback) + }() + // task that is not paused or paused manually, just ignore it if stStatus == nil || stStatus.Stage != pb.Stage_Paused || stStatus.Result == nil || stStatus.Result.IsCanceled { return ResumeIgnore @@ -265,21 +258,28 @@ func (tsc *realTaskStatusChecker) getResumeStrategy(stStatus *pb.SubTaskStatus, pErr := processErr if !isResumableError(processErr) { failpoint.Inject("TaskCheckInterval", func(_ failpoint.Value) { - tsc.l.Info("error is not resumable", zap.Stringer("error", pErr)) + log.L().Info("error is not resumable", zap.Stringer("error", pErr)) }) return ResumeNoSense } } // auto resume interval does not exceed backoff duration, skip this paused task - if time.Since(tsc.bc.latestResumeTime[stStatus.Name]) < duration { + if time.Since(i.LatestResumeTime) < i.Backoff.Current() { return ResumeSkip } return ResumeDispatch } -func (tsc *realTaskStatusChecker) getRelayResumeStrategy(relayStatus *pb.RelayStatus, duration time.Duration) ResumeStrategy { +func (i *AutoResumeInfo) checkResumeRelay( + relayStatus *pb.RelayStatus, + backoffRollback time.Duration, +) (strategy ResumeStrategy) { + defer func() { + i.update(strategy, backoffRollback) + }() + // relay that is not paused or paused manually, just ignore it if relayStatus == nil || relayStatus.Stage != pb.Stage_Paused || relayStatus.Result == nil || relayStatus.Result.IsCanceled { return ResumeIgnore @@ -291,52 +291,64 @@ func (tsc *realTaskStatusChecker) getRelayResumeStrategy(relayStatus *pb.RelaySt } } - if time.Since(tsc.bc.latestRelayResumeTime) < duration { + if time.Since(i.LatestResumeTime) < i.Backoff.Current() { return ResumeSkip } return ResumeDispatch } -func (tsc *realTaskStatusChecker) checkRelayStatus() { - relayStatus := tsc.w.relayHolder.Status(nil) - if tsc.bc.relayBackoff == nil { - tsc.bc.relayBackoff, _ = backoff.NewBackoff(tsc.cfg.BackoffFactor, tsc.cfg.BackoffJitter, tsc.cfg.BackoffMin.Duration, tsc.cfg.BackoffMax.Duration) - tsc.bc.latestRelayPausedTime = time.Now() - tsc.bc.latestRelayResumeTime = time.Now() - } - rbf := tsc.bc.relayBackoff - duration := rbf.Current() - strategy := tsc.getRelayResumeStrategy(relayStatus, duration) +func (i *AutoResumeInfo) update(strategy ResumeStrategy, backoffRollback time.Duration) { switch strategy { case ResumeIgnore: - if time.Since(tsc.bc.latestRelayPausedTime) > tsc.cfg.BackoffRollback.Duration { - rbf.Rollback() + if time.Since(i.LatestPausedTime) > backoffRollback { + i.Backoff.Rollback() // after each rollback, reset this timer - tsc.bc.latestRelayPausedTime = time.Now() + i.LatestPausedTime = time.Now() } case ResumeNoSense: // this strategy doesn't forward or rollback backoff - tsc.bc.latestRelayPausedTime = time.Now() - blockTime := tsc.bc.latestRelayBlockTime - if !blockTime.IsZero() { - tsc.l.Warn("relay can't auto resume", zap.Duration("paused duration", time.Since(blockTime))) - } else { - tsc.bc.latestRelayBlockTime = time.Now() - tsc.l.Warn("relay can't auto resume") + i.LatestPausedTime = time.Now() + if i.LatestBlockTime.IsZero() { + i.LatestBlockTime = time.Now() + } + case ResumeSkip, ResumeDispatch: + i.LatestPausedTime = time.Now() + } +} + +func (tsc *realTaskStatusChecker) checkRelayStatus() { + relayStatus := tsc.w.relayHolder.Status(nil) + if tsc.relayAutoResume == nil { + bf, _ := backoff.NewBackoff( + tsc.cfg.BackoffFactor, + tsc.cfg.BackoffJitter, + tsc.cfg.BackoffMin.Duration, + tsc.cfg.BackoffMax.Duration) + tsc.relayAutoResume = &AutoResumeInfo{ + Backoff: bf, + LatestResumeTime: time.Now(), + LatestPausedTime: time.Now(), } + } + + strategy := tsc.relayAutoResume.checkResumeRelay(relayStatus, tsc.cfg.BackoffRollback.Duration) + switch strategy { + case ResumeNoSense: + tsc.l.Warn("relay can't auto resume", + zap.Duration("paused duration", time.Since(tsc.relayAutoResume.LatestBlockTime))) case ResumeSkip: - tsc.l.Warn("backoff skip auto resume relay", zap.Time("latestResumeTime", tsc.bc.latestRelayResumeTime), zap.Duration("duration", duration)) - tsc.bc.latestRelayPausedTime = time.Now() + tsc.l.Warn("backoff skip auto resume relay", + zap.Time("latestResumeTime", tsc.relayAutoResume.LatestResumeTime), + zap.Duration("duration", tsc.relayAutoResume.Backoff.Current())) case ResumeDispatch: - tsc.bc.latestRelayPausedTime = time.Now() err := tsc.w.operateRelay(tsc.ctx, pb.RelayOp_ResumeRelay) if err != nil { tsc.l.Error("dispatch auto resume relay failed", zap.Error(err)) } else { tsc.l.Info("dispatch auto resume relay") - tsc.bc.latestRelayResumeTime = time.Now() - rbf.BoundaryForward() + tsc.relayAutoResume.LatestResumeTime = time.Now() + tsc.relayAutoResume.Backoff.BoundaryForward() } } } @@ -346,57 +358,49 @@ func (tsc *realTaskStatusChecker) checkTaskStatus() { defer func() { // cleanup outdated tasks - for taskName := range tsc.bc.backoffs { + for taskName := range tsc.subtaskAutoResume { _, ok := allSubTaskStatus[taskName] if !ok { tsc.l.Debug("remove task from checker", zap.String("task", taskName)) - delete(tsc.bc.backoffs, taskName) - delete(tsc.bc.latestPausedTime, taskName) - delete(tsc.bc.latestBlockTime, taskName) - delete(tsc.bc.latestResumeTime, taskName) + delete(tsc.subtaskAutoResume, taskName) } } }() for taskName, stStatus := range allSubTaskStatus { - bf, ok := tsc.bc.backoffs[taskName] + info, ok := tsc.subtaskAutoResume[taskName] if !ok { - bf, _ = backoff.NewBackoff(tsc.cfg.BackoffFactor, tsc.cfg.BackoffJitter, tsc.cfg.BackoffMin.Duration, tsc.cfg.BackoffMax.Duration) - tsc.bc.backoffs[taskName] = bf - tsc.bc.latestPausedTime[taskName] = time.Now() - tsc.bc.latestResumeTime[taskName] = time.Now() + bf, _ := backoff.NewBackoff( + tsc.cfg.BackoffFactor, + tsc.cfg.BackoffJitter, + tsc.cfg.BackoffMin.Duration, + tsc.cfg.BackoffMax.Duration) + info = &AutoResumeInfo{ + Backoff: bf, + LatestPausedTime: time.Now(), + LatestResumeTime: time.Now(), + } + tsc.subtaskAutoResume[taskName] = info } - duration := bf.Current() - strategy := tsc.getResumeStrategy(stStatus, duration) + strategy := info.CheckResumeSubtask(stStatus, tsc.cfg.BackoffRollback.Duration) switch strategy { - case ResumeIgnore: - if time.Since(tsc.bc.latestPausedTime[taskName]) > tsc.cfg.BackoffRollback.Duration { - bf.Rollback() - // after each rollback, reset this timer - tsc.bc.latestPausedTime[taskName] = time.Now() - } case ResumeNoSense: - // this strategy doesn't forward or rollback backoff - tsc.bc.latestPausedTime[taskName] = time.Now() - blockTime, ok := tsc.bc.latestBlockTime[taskName] - if ok { - tsc.l.Warn("task can't auto resume", zap.String("task", taskName), zap.Duration("paused duration", time.Since(blockTime))) - } else { - tsc.bc.latestBlockTime[taskName] = time.Now() - tsc.l.Warn("task can't auto resume", zap.String("task", taskName)) - } + tsc.l.Warn("task can't auto resume", + zap.String("task", taskName), + zap.Duration("paused duration", time.Since(info.LatestBlockTime))) case ResumeSkip: - tsc.l.Warn("backoff skip auto resume task", zap.String("task", taskName), zap.Time("latestResumeTime", tsc.bc.latestResumeTime[taskName]), zap.Duration("duration", duration)) - tsc.bc.latestPausedTime[taskName] = time.Now() + tsc.l.Warn("backoff skip auto resume task", + zap.String("task", taskName), + zap.Time("latestResumeTime", info.LatestResumeTime), + zap.Duration("duration", info.Backoff.Current())) case ResumeDispatch: - tsc.bc.latestPausedTime[taskName] = time.Now() err := tsc.w.OperateSubTask(taskName, pb.TaskOp_AutoResume) if err != nil { tsc.l.Error("dispatch auto resume task failed", zap.String("task", taskName), zap.Error(err)) } else { tsc.l.Info("dispatch auto resume task", zap.String("task", taskName)) - tsc.bc.latestResumeTime[taskName] = time.Now() - bf.BoundaryForward() + info.LatestResumeTime = time.Now() + info.Backoff.BoundaryForward() } } } diff --git a/dm/dm/worker/task_checker_test.go b/dm/dm/worker/task_checker_test.go index c15abf97091..963f2f6e881 100644 --- a/dm/dm/worker/task_checker_test.go +++ b/dm/dm/worker/task_checker_test.go @@ -24,6 +24,7 @@ import ( "github.com/pingcap/tiflow/dm/dm/config" "github.com/pingcap/tiflow/dm/dm/pb" "github.com/pingcap/tiflow/dm/dm/unit" + "github.com/pingcap/tiflow/dm/pkg/backoff" "github.com/pingcap/tiflow/dm/pkg/log" "github.com/pingcap/tiflow/dm/pkg/terror" ) @@ -33,8 +34,8 @@ var _ = check.Suite(&testTaskCheckerSuite{}) type testTaskCheckerSuite struct{} var ( - unsupporteModifyColumnError = unit.NewProcessError(terror.ErrDBExecuteFailed.Delegate(&tmysql.SQLError{Code: 1105, Message: "unsupported modify column length 20 is less than origin 40", State: tmysql.DefaultMySQLState})) - unknownProcessError = unit.NewProcessError(errors.New("error mesage")) + unsupportedModifyColumnError = unit.NewProcessError(terror.ErrDBExecuteFailed.Delegate(&tmysql.SQLError{Code: 1105, Message: "unsupported modify column length 20 is less than origin 40", State: tmysql.DefaultMySQLState})) + unknownProcessError = unit.NewProcessError(errors.New("error message")) ) func (s *testTaskCheckerSuite) TestResumeStrategy(c *check.C) { @@ -54,7 +55,7 @@ func (s *testTaskCheckerSuite) TestResumeStrategy(c *check.C) { {&pb.SubTaskStatus{Name: taskName, Stage: pb.Stage_Running}, now, time.Duration(0), 1 * time.Millisecond, ResumeIgnore}, {&pb.SubTaskStatus{Name: taskName, Stage: pb.Stage_Paused}, now, time.Duration(0), 1 * time.Millisecond, ResumeIgnore}, {&pb.SubTaskStatus{Name: taskName, Stage: pb.Stage_Paused, Result: &pb.ProcessResult{IsCanceled: true}}, now, time.Duration(0), 1 * time.Millisecond, ResumeIgnore}, - {&pb.SubTaskStatus{Name: taskName, Stage: pb.Stage_Paused, Result: &pb.ProcessResult{IsCanceled: false, Errors: []*pb.ProcessError{unsupporteModifyColumnError}}}, now, time.Duration(0), 1 * time.Millisecond, ResumeNoSense}, + {&pb.SubTaskStatus{Name: taskName, Stage: pb.Stage_Paused, Result: &pb.ProcessResult{IsCanceled: false, Errors: []*pb.ProcessError{unsupportedModifyColumnError}}}, now, time.Duration(0), 1 * time.Millisecond, ResumeNoSense}, {&pb.SubTaskStatus{Name: taskName, Stage: pb.Stage_Paused, Result: &pb.ProcessResult{IsCanceled: false}}, now, time.Duration(0), 1 * time.Second, ResumeSkip}, {&pb.SubTaskStatus{Name: taskName, Stage: pb.Stage_Paused, Result: &pb.ProcessResult{IsCanceled: false}}, now, -2 * time.Millisecond, 1 * time.Millisecond, ResumeDispatch}, } @@ -70,8 +71,16 @@ func (s *testTaskCheckerSuite) TestResumeStrategy(c *check.C) { for _, tc := range testCases { rtsc, ok := tsc.(*realTaskStatusChecker) c.Assert(ok, check.IsTrue) - rtsc.bc.latestResumeTime[taskName] = tc.latestResumeFn(tc.addition) - strategy := rtsc.getResumeStrategy(tc.status, tc.duration) + bf, _ := backoff.NewBackoff( + 1, + false, + tc.duration, + tc.duration) + rtsc.subtaskAutoResume[taskName] = &AutoResumeInfo{ + Backoff: bf, + LatestResumeTime: tc.latestResumeFn(tc.addition), + } + strategy := rtsc.subtaskAutoResume[taskName].CheckResumeSubtask(tc.status, config.DefaultBackoffRollback) c.Assert(strategy, check.Equals, tc.expected) } } @@ -114,8 +123,7 @@ func (s *testTaskCheckerSuite) TestCheck(c *check.C) { c.Assert(st.cfg.Adjust(false), check.IsNil) rtsc.w.subTaskHolder.recordSubTask(st) rtsc.check() - bf, ok := rtsc.bc.backoffs[taskName] - c.Assert(ok, check.IsTrue) + bf := rtsc.subtaskAutoResume[taskName].Backoff // test resume with paused task st.stage = pb.Stage_Paused @@ -141,14 +149,14 @@ func (s *testTaskCheckerSuite) TestCheck(c *check.C) { // test no sense strategy st.result = &pb.ProcessResult{ IsCanceled: false, - Errors: []*pb.ProcessError{unsupporteModifyColumnError}, + Errors: []*pb.ProcessError{unsupportedModifyColumnError}, } rtsc.check() - c.Assert(latestPausedTime.Before(rtsc.bc.latestPausedTime[taskName]), check.IsTrue) - latestBlockTime = rtsc.bc.latestBlockTime[taskName] + c.Assert(latestPausedTime.Before(rtsc.subtaskAutoResume[taskName].LatestPausedTime), check.IsTrue) + latestBlockTime = rtsc.subtaskAutoResume[taskName].LatestBlockTime time.Sleep(200 * time.Millisecond) rtsc.check() - c.Assert(rtsc.bc.latestBlockTime[taskName], check.Equals, latestBlockTime) + c.Assert(rtsc.subtaskAutoResume[taskName].LatestBlockTime, check.Equals, latestBlockTime) c.Assert(bf.Current(), check.Equals, current) // test resume skip strategy @@ -171,8 +179,7 @@ func (s *testTaskCheckerSuite) TestCheck(c *check.C) { } rtsc.w.subTaskHolder.recordSubTask(st) rtsc.check() - bf, ok = rtsc.bc.backoffs[taskName] - c.Assert(ok, check.IsTrue) + bf = rtsc.subtaskAutoResume[taskName].Backoff st.stage = pb.Stage_Paused st.result = &pb.ProcessResult{ @@ -180,14 +187,14 @@ func (s *testTaskCheckerSuite) TestCheck(c *check.C) { Errors: []*pb.ProcessError{unknownProcessError}, } rtsc.check() - latestResumeTime = rtsc.bc.latestResumeTime[taskName] - latestPausedTime = rtsc.bc.latestPausedTime[taskName] + latestResumeTime = rtsc.subtaskAutoResume[taskName].LatestResumeTime + latestPausedTime = rtsc.subtaskAutoResume[taskName].LatestPausedTime c.Assert(bf.Current(), check.Equals, 10*time.Second) for i := 0; i < 10; i++ { rtsc.check() - c.Assert(latestResumeTime, check.Equals, rtsc.bc.latestResumeTime[taskName]) - c.Assert(latestPausedTime.Before(rtsc.bc.latestPausedTime[taskName]), check.IsTrue) - latestPausedTime = rtsc.bc.latestPausedTime[taskName] + c.Assert(latestResumeTime, check.Equals, rtsc.subtaskAutoResume[taskName].LatestResumeTime) + c.Assert(latestPausedTime.Before(rtsc.subtaskAutoResume[taskName].LatestPausedTime), check.IsTrue) + latestPausedTime = rtsc.subtaskAutoResume[taskName].LatestPausedTime } } @@ -235,10 +242,10 @@ func (s *testTaskCheckerSuite) TestCheckTaskIndependent(c *check.C) { } rtsc.w.subTaskHolder.recordSubTask(st2) rtsc.check() - c.Assert(len(rtsc.bc.backoffs), check.Equals, 2) - c.Assert(len(rtsc.bc.latestPausedTime), check.Equals, 2) - c.Assert(len(rtsc.bc.latestResumeTime), check.Equals, 2) - c.Assert(len(rtsc.bc.latestBlockTime), check.Equals, 0) + c.Assert(len(rtsc.subtaskAutoResume), check.Equals, 2) + for _, times := range rtsc.subtaskAutoResume { + c.Assert(times.LatestBlockTime.IsZero(), check.IsTrue) + } // test backoff strategies of different tasks do not affect each other st1 = &SubTask{ @@ -246,7 +253,7 @@ func (s *testTaskCheckerSuite) TestCheckTaskIndependent(c *check.C) { stage: pb.Stage_Paused, result: &pb.ProcessResult{ IsCanceled: false, - Errors: []*pb.ProcessError{unsupporteModifyColumnError}, + Errors: []*pb.ProcessError{unsupportedModifyColumnError}, }, l: log.With(zap.String("subtask", task1)), } @@ -264,26 +271,26 @@ func (s *testTaskCheckerSuite) TestCheckTaskIndependent(c *check.C) { c.Assert(st2.cfg.Adjust(false), check.IsNil) rtsc.w.subTaskHolder.recordSubTask(st2) - task1LatestResumeTime = rtsc.bc.latestResumeTime[task1] - task2LatestResumeTime = rtsc.bc.latestResumeTime[task2] + task1LatestResumeTime = rtsc.subtaskAutoResume[task1].LatestResumeTime + task2LatestResumeTime = rtsc.subtaskAutoResume[task2].LatestResumeTime for i := 0; i < 10; i++ { time.Sleep(backoffMin) rtsc.check() - c.Assert(task1LatestResumeTime, check.Equals, rtsc.bc.latestResumeTime[task1]) - c.Assert(task2LatestResumeTime.Before(rtsc.bc.latestResumeTime[task2]), check.IsTrue) - c.Assert(len(rtsc.bc.latestBlockTime), check.Equals, 1) - task2LatestResumeTime = rtsc.bc.latestResumeTime[task2] + c.Assert(task1LatestResumeTime, check.Equals, rtsc.subtaskAutoResume[task1].LatestResumeTime) + c.Assert(task2LatestResumeTime.Before(rtsc.subtaskAutoResume[task2].LatestResumeTime), check.IsTrue) + c.Assert(rtsc.subtaskAutoResume[task1].LatestBlockTime.IsZero(), check.IsFalse) + c.Assert(rtsc.subtaskAutoResume[task2].LatestBlockTime.IsZero(), check.IsTrue) + + task2LatestResumeTime = rtsc.subtaskAutoResume[task2].LatestResumeTime } // test task information cleanup in task status checker rtsc.w.subTaskHolder.removeSubTask(task1) time.Sleep(backoffMin) rtsc.check() - c.Assert(task2LatestResumeTime.Before(rtsc.bc.latestResumeTime[task2]), check.IsTrue) - c.Assert(len(rtsc.bc.backoffs), check.Equals, 1) - c.Assert(len(rtsc.bc.latestPausedTime), check.Equals, 1) - c.Assert(len(rtsc.bc.latestResumeTime), check.Equals, 1) - c.Assert(len(rtsc.bc.latestBlockTime), check.Equals, 0) + c.Assert(task2LatestResumeTime.Before(rtsc.subtaskAutoResume[task2].LatestResumeTime), check.IsTrue) + c.Assert(len(rtsc.subtaskAutoResume), check.Equals, 1) + c.Assert(rtsc.subtaskAutoResume[task2].LatestBlockTime.IsZero(), check.IsTrue) } func (s *testTaskCheckerSuite) TestIsResumableError(c *check.C) {