Skip to content

Commit

Permalink
poor: resume task if sync unit exits with invalid connection error (p…
Browse files Browse the repository at this point in the history
  • Loading branch information
IANTHEREAL authored Mar 7, 2019
1 parent 1bdcb3e commit f80e6fb
Showing 1 changed file with 42 additions and 8 deletions.
50 changes: 42 additions & 8 deletions dm/worker/subtask.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package worker

import (
"strings"
"sync"
"time"

Expand Down Expand Up @@ -154,7 +155,7 @@ func (st *SubTask) Run() {
st.ctx, st.cancel = context.WithCancel(context.Background())
pr := make(chan pb.ProcessResult, 1)
st.wg.Add(1)
go st.fetchResult(st.ctx, st.cancel, pr)
go st.fetchResult(pr)
go cu.Process(st.ctx, pr)

st.wg.Add(1)
Expand All @@ -163,34 +164,50 @@ func (st *SubTask) Run() {

// fetchResult fetches units process result
// when dm-unit report an error, we need to re-Process the sub task
func (st *SubTask) fetchResult(ctx context.Context, cancel context.CancelFunc, pr chan pb.ProcessResult) {
func (st *SubTask) fetchResult(pr chan pb.ProcessResult) {
defer st.wg.Done()

retry:
select {
case <-ctx.Done():
case <-st.ctx.Done():
return
case result := <-pr:
st.setResult(&result) // save result
cancel() // dm-unit finished, canceled or error occurred, always cancel processing
st.cancel() // dm-unit finished, canceled or error occurred, always cancel processing

if len(result.Errors) == 0 && st.Stage() == pb.Stage_Paused {
return // paused by external request
}

var stage pb.Stage
var (
cu = st.CurrUnit()
stage pb.Stage
)
if len(result.Errors) == 0 {
if result.IsCanceled {
stage = pb.Stage_Stopped // canceled by user
} else {
stage = pb.Stage_Finished // process finished with no error
}
} else {
/* TODO
it's a poor and very rough retry feature, the main reason is that
the concurrency control of the sub task module is very confusing and needs to be optimized.
After improving its state transition and concurrency control,
I will optimize the implementation of retry feature.
*/
if st.retryErrors(result.Errors, cu) {
log.Warnf("[subtask] %s (%s) retry on error %v, waiting 10 seconds!", st.cfg.Name, cu.Type(), result.Errors)
st.ctx, st.cancel = context.WithCancel(context.Background())
time.Sleep(10 * time.Second)
go cu.Resume(st.ctx, pr)
goto retry
}

stage = pb.Stage_Paused // error occurred, paused
}
st.setStage(stage)

cu := st.CurrUnit()

log.Infof("[subtask] %s dm-unit %s process returned with stage %s, status %s", st.cfg.Name, cu.Type(), stage.String(), st.StatusJSON())

switch stage {
Expand Down Expand Up @@ -391,7 +408,7 @@ func (st *SubTask) Resume() error {
st.ctx, st.cancel = context.WithCancel(context.Background())
pr := make(chan pb.ProcessResult, 1)
st.wg.Add(1)
go st.fetchResult(st.ctx, st.cancel, pr)
go st.fetchResult(pr)
go cu.Resume(st.ctx, pr)

st.wg.Add(1)
Expand Down Expand Up @@ -621,3 +638,20 @@ func (st *SubTask) unitTransWaitCondition() error {
}
return nil
}

func (st *SubTask) retryErrors(errors []*pb.ProcessError, current unit.Unit) bool {
retry := true
switch current.Type() {
case pb.UnitType_Sync:
for _, err := range errors {
if strings.Contains(err.Msg, "invalid connection") {
continue
}
retry = false
}
default:
retry = false
}

return retry
}

0 comments on commit f80e6fb

Please sign in to comment.