Skip to content

Commit

Permalink
task_checker(dm): refactor and expose an auto resume function (#5004)
Browse files Browse the repository at this point in the history
ref #4287
  • Loading branch information
lance6716 committed Mar 28, 2022
1 parent e342c02 commit bddf162
Show file tree
Hide file tree
Showing 2 changed files with 139 additions and 128 deletions.
192 changes: 98 additions & 94 deletions dm/dm/worker/task_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,33 +104,13 @@ type TaskStatusChecker interface {
// NewTaskStatusChecker is a TaskStatusChecker initializer.
var NewTaskStatusChecker = NewRealTaskStatusChecker

type backoffController struct {
// task name -> backoff counter
backoffs map[string]*backoff.Backoff

// task name -> task latest paused time that checker observes
latestPausedTime map[string]time.Time

// task name -> task latest block time, block means task paused with un-resumable error
latestBlockTime map[string]time.Time

// task name -> the latest auto resume time
latestResumeTime map[string]time.Time

latestRelayPausedTime time.Time
latestRelayBlockTime time.Time
latestRelayResumeTime time.Time
relayBackoff *backoff.Backoff
}

// newBackoffController returns a new backoffController instance.
func newBackoffController() *backoffController {
return &backoffController{
backoffs: make(map[string]*backoff.Backoff),
latestPausedTime: make(map[string]time.Time),
latestBlockTime: make(map[string]time.Time),
latestResumeTime: make(map[string]time.Time),
}
// AutoResumeInfo contains some Time and Backoff that are related to auto resume.
// This structure is exposed for DM as library.
type AutoResumeInfo struct {
Backoff *backoff.Backoff
LatestPausedTime time.Time
LatestBlockTime time.Time
LatestResumeTime time.Time
}

// realTaskStatusChecker is not thread-safe.
Expand All @@ -145,16 +125,18 @@ type realTaskStatusChecker struct {
cfg config.CheckerConfig
l log.Logger
w *SourceWorker
bc *backoffController

subtaskAutoResume map[string]*AutoResumeInfo
relayAutoResume *AutoResumeInfo
}

// NewRealTaskStatusChecker creates a new realTaskStatusChecker instance.
func NewRealTaskStatusChecker(cfg config.CheckerConfig, w *SourceWorker) TaskStatusChecker {
tsc := &realTaskStatusChecker{
cfg: cfg,
l: log.With(zap.String("component", "task checker")),
w: w,
bc: newBackoffController(),
cfg: cfg,
l: log.With(zap.String("component", "task checker")),
w: w,
subtaskAutoResume: map[string]*AutoResumeInfo{},
}
tsc.closed.Store(true)
return tsc
Expand Down Expand Up @@ -254,7 +236,18 @@ func isResumableError(err *pb.ProcessError) bool {
return true
}

func (tsc *realTaskStatusChecker) getResumeStrategy(stStatus *pb.SubTaskStatus, duration time.Duration) ResumeStrategy {
// CheckResumeSubtask updates info and returns ResumeStrategy for a subtask.
// When ResumeDispatch and the subtask is successfully resumed at caller,
// LatestResumeTime and backoff should be updated.
// This function is exposed for DM as library.
func (i *AutoResumeInfo) CheckResumeSubtask(
stStatus *pb.SubTaskStatus,
backoffRollback time.Duration,
) (strategy ResumeStrategy) {
defer func() {
i.update(strategy, backoffRollback)
}()

// task that is not paused or paused manually, just ignore it
if stStatus == nil || stStatus.Stage != pb.Stage_Paused || stStatus.Result == nil || stStatus.Result.IsCanceled {
return ResumeIgnore
Expand All @@ -265,21 +258,28 @@ func (tsc *realTaskStatusChecker) getResumeStrategy(stStatus *pb.SubTaskStatus,
pErr := processErr
if !isResumableError(processErr) {
failpoint.Inject("TaskCheckInterval", func(_ failpoint.Value) {
tsc.l.Info("error is not resumable", zap.Stringer("error", pErr))
log.L().Info("error is not resumable", zap.Stringer("error", pErr))
})
return ResumeNoSense
}
}

// auto resume interval does not exceed backoff duration, skip this paused task
if time.Since(tsc.bc.latestResumeTime[stStatus.Name]) < duration {
if time.Since(i.LatestResumeTime) < i.Backoff.Current() {
return ResumeSkip
}

return ResumeDispatch
}

func (tsc *realTaskStatusChecker) getRelayResumeStrategy(relayStatus *pb.RelayStatus, duration time.Duration) ResumeStrategy {
func (i *AutoResumeInfo) checkResumeRelay(
relayStatus *pb.RelayStatus,
backoffRollback time.Duration,
) (strategy ResumeStrategy) {
defer func() {
i.update(strategy, backoffRollback)
}()

// relay that is not paused or paused manually, just ignore it
if relayStatus == nil || relayStatus.Stage != pb.Stage_Paused || relayStatus.Result == nil || relayStatus.Result.IsCanceled {
return ResumeIgnore
Expand All @@ -291,52 +291,64 @@ func (tsc *realTaskStatusChecker) getRelayResumeStrategy(relayStatus *pb.RelaySt
}
}

if time.Since(tsc.bc.latestRelayResumeTime) < duration {
if time.Since(i.LatestResumeTime) < i.Backoff.Current() {
return ResumeSkip
}

return ResumeDispatch
}

func (tsc *realTaskStatusChecker) checkRelayStatus() {
relayStatus := tsc.w.relayHolder.Status(nil)
if tsc.bc.relayBackoff == nil {
tsc.bc.relayBackoff, _ = backoff.NewBackoff(tsc.cfg.BackoffFactor, tsc.cfg.BackoffJitter, tsc.cfg.BackoffMin.Duration, tsc.cfg.BackoffMax.Duration)
tsc.bc.latestRelayPausedTime = time.Now()
tsc.bc.latestRelayResumeTime = time.Now()
}
rbf := tsc.bc.relayBackoff
duration := rbf.Current()
strategy := tsc.getRelayResumeStrategy(relayStatus, duration)
func (i *AutoResumeInfo) update(strategy ResumeStrategy, backoffRollback time.Duration) {
switch strategy {
case ResumeIgnore:
if time.Since(tsc.bc.latestRelayPausedTime) > tsc.cfg.BackoffRollback.Duration {
rbf.Rollback()
if time.Since(i.LatestPausedTime) > backoffRollback {
i.Backoff.Rollback()
// after each rollback, reset this timer
tsc.bc.latestRelayPausedTime = time.Now()
i.LatestPausedTime = time.Now()
}
case ResumeNoSense:
// this strategy doesn't forward or rollback backoff
tsc.bc.latestRelayPausedTime = time.Now()
blockTime := tsc.bc.latestRelayBlockTime
if !blockTime.IsZero() {
tsc.l.Warn("relay can't auto resume", zap.Duration("paused duration", time.Since(blockTime)))
} else {
tsc.bc.latestRelayBlockTime = time.Now()
tsc.l.Warn("relay can't auto resume")
i.LatestPausedTime = time.Now()
if i.LatestBlockTime.IsZero() {
i.LatestBlockTime = time.Now()
}
case ResumeSkip, ResumeDispatch:
i.LatestPausedTime = time.Now()
}
}

func (tsc *realTaskStatusChecker) checkRelayStatus() {
relayStatus := tsc.w.relayHolder.Status(nil)
if tsc.relayAutoResume == nil {
bf, _ := backoff.NewBackoff(
tsc.cfg.BackoffFactor,
tsc.cfg.BackoffJitter,
tsc.cfg.BackoffMin.Duration,
tsc.cfg.BackoffMax.Duration)
tsc.relayAutoResume = &AutoResumeInfo{
Backoff: bf,
LatestResumeTime: time.Now(),
LatestPausedTime: time.Now(),
}
}

strategy := tsc.relayAutoResume.checkResumeRelay(relayStatus, tsc.cfg.BackoffRollback.Duration)
switch strategy {
case ResumeNoSense:
tsc.l.Warn("relay can't auto resume",
zap.Duration("paused duration", time.Since(tsc.relayAutoResume.LatestBlockTime)))
case ResumeSkip:
tsc.l.Warn("backoff skip auto resume relay", zap.Time("latestResumeTime", tsc.bc.latestRelayResumeTime), zap.Duration("duration", duration))
tsc.bc.latestRelayPausedTime = time.Now()
tsc.l.Warn("backoff skip auto resume relay",
zap.Time("latestResumeTime", tsc.relayAutoResume.LatestResumeTime),
zap.Duration("duration", tsc.relayAutoResume.Backoff.Current()))
case ResumeDispatch:
tsc.bc.latestRelayPausedTime = time.Now()
err := tsc.w.operateRelay(tsc.ctx, pb.RelayOp_ResumeRelay)
if err != nil {
tsc.l.Error("dispatch auto resume relay failed", zap.Error(err))
} else {
tsc.l.Info("dispatch auto resume relay")
tsc.bc.latestRelayResumeTime = time.Now()
rbf.BoundaryForward()
tsc.relayAutoResume.LatestResumeTime = time.Now()
tsc.relayAutoResume.Backoff.BoundaryForward()
}
}
}
Expand All @@ -346,57 +358,49 @@ func (tsc *realTaskStatusChecker) checkTaskStatus() {

defer func() {
// cleanup outdated tasks
for taskName := range tsc.bc.backoffs {
for taskName := range tsc.subtaskAutoResume {
_, ok := allSubTaskStatus[taskName]
if !ok {
tsc.l.Debug("remove task from checker", zap.String("task", taskName))
delete(tsc.bc.backoffs, taskName)
delete(tsc.bc.latestPausedTime, taskName)
delete(tsc.bc.latestBlockTime, taskName)
delete(tsc.bc.latestResumeTime, taskName)
delete(tsc.subtaskAutoResume, taskName)
}
}
}()

for taskName, stStatus := range allSubTaskStatus {
bf, ok := tsc.bc.backoffs[taskName]
info, ok := tsc.subtaskAutoResume[taskName]
if !ok {
bf, _ = backoff.NewBackoff(tsc.cfg.BackoffFactor, tsc.cfg.BackoffJitter, tsc.cfg.BackoffMin.Duration, tsc.cfg.BackoffMax.Duration)
tsc.bc.backoffs[taskName] = bf
tsc.bc.latestPausedTime[taskName] = time.Now()
tsc.bc.latestResumeTime[taskName] = time.Now()
bf, _ := backoff.NewBackoff(
tsc.cfg.BackoffFactor,
tsc.cfg.BackoffJitter,
tsc.cfg.BackoffMin.Duration,
tsc.cfg.BackoffMax.Duration)
info = &AutoResumeInfo{
Backoff: bf,
LatestPausedTime: time.Now(),
LatestResumeTime: time.Now(),
}
tsc.subtaskAutoResume[taskName] = info
}
duration := bf.Current()
strategy := tsc.getResumeStrategy(stStatus, duration)
strategy := info.CheckResumeSubtask(stStatus, tsc.cfg.BackoffRollback.Duration)
switch strategy {
case ResumeIgnore:
if time.Since(tsc.bc.latestPausedTime[taskName]) > tsc.cfg.BackoffRollback.Duration {
bf.Rollback()
// after each rollback, reset this timer
tsc.bc.latestPausedTime[taskName] = time.Now()
}
case ResumeNoSense:
// this strategy doesn't forward or rollback backoff
tsc.bc.latestPausedTime[taskName] = time.Now()
blockTime, ok := tsc.bc.latestBlockTime[taskName]
if ok {
tsc.l.Warn("task can't auto resume", zap.String("task", taskName), zap.Duration("paused duration", time.Since(blockTime)))
} else {
tsc.bc.latestBlockTime[taskName] = time.Now()
tsc.l.Warn("task can't auto resume", zap.String("task", taskName))
}
tsc.l.Warn("task can't auto resume",
zap.String("task", taskName),
zap.Duration("paused duration", time.Since(info.LatestBlockTime)))
case ResumeSkip:
tsc.l.Warn("backoff skip auto resume task", zap.String("task", taskName), zap.Time("latestResumeTime", tsc.bc.latestResumeTime[taskName]), zap.Duration("duration", duration))
tsc.bc.latestPausedTime[taskName] = time.Now()
tsc.l.Warn("backoff skip auto resume task",
zap.String("task", taskName),
zap.Time("latestResumeTime", info.LatestResumeTime),
zap.Duration("duration", info.Backoff.Current()))
case ResumeDispatch:
tsc.bc.latestPausedTime[taskName] = time.Now()
err := tsc.w.OperateSubTask(taskName, pb.TaskOp_AutoResume)
if err != nil {
tsc.l.Error("dispatch auto resume task failed", zap.String("task", taskName), zap.Error(err))
} else {
tsc.l.Info("dispatch auto resume task", zap.String("task", taskName))
tsc.bc.latestResumeTime[taskName] = time.Now()
bf.BoundaryForward()
info.LatestResumeTime = time.Now()
info.Backoff.BoundaryForward()
}
}
}
Expand Down
Loading

0 comments on commit bddf162

Please sign in to comment.