Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

.*: fix bug that after execute pause-task the task may still running #644

Merged
merged 10 commits into from
Apr 30, 2020
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions _utils/terror_gen/errors_release.txt
Original file line number Diff line number Diff line change
Expand Up @@ -390,8 +390,8 @@ ErrWorkerStartService,[code=40048:class=dm-worker:scope=internal:level=high],"st
ErrWorkerNoStart,[code=40070:class=dm-worker:scope=internal:level=high],"worker has not started"
ErrWorkerAlreadyClosed,[code=40049:class=dm-worker:scope=internal:level=high],"worker already closed"
ErrWorkerAlreadyStart,[code=40071:class=dm-worker:scope=internal:level=high],"worker already started"
ErrWorkerNotRunningStage,[code=40050:class=dm-worker:scope=internal:level=high],"current stage is not running not valid"
ErrWorkerNotPausedStage,[code=40051:class=dm-worker:scope=internal:level=high],"current stage is not paused not valid"
ErrWorkerNotRunningStage,[code=40050:class=dm-worker:scope=internal:level=high],"current stage is %s but not running, invalid"
ErrWorkerNotPausedStage,[code=40051:class=dm-worker:scope=internal:level=high],"current stage is %s but not paused, invalid"
ErrWorkerUpdateTaskStage,[code=40052:class=dm-worker:scope=internal:level=high],"can only update task on Paused stage, but current stage is %s"
ErrWorkerMigrateStopRelay,[code=40053:class=dm-worker:scope=internal:level=high],"relay unit has stopped, can not be migrated"
ErrWorkerSubTaskNotFound,[code=40054:class=dm-worker:scope=internal:level=high],"sub task with name %s not found"
Expand Down
261 changes: 134 additions & 127 deletions dm/pb/dmworker.pb.go

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions dm/proto/dmworker.proto
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,9 @@ enum Stage {
Paused = 3;
Stopped = 4;
Finished = 5;

Pausing = 6;
Resuming = 7;
}

// CheckStatus represents status for check unit
Expand Down
9 changes: 9 additions & 0 deletions dm/unit/unit.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package unit

import (
"context"
"strings"
"time"

"github.com/pingcap/dm/dm/config"
Expand Down Expand Up @@ -77,3 +78,11 @@ func NewProcessError(err error) *pb.ProcessError {
}
return result
}

// IsCtxCanceledProcessErr returns true if the err's context canceled
func IsCtxCanceledProcessErr(err *pb.ProcessError) bool {
if strings.Contains(err.Msg, "context canceled") {
return true
}
return false
}
33 changes: 33 additions & 0 deletions dm/unit/unit_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright 2020 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package unit

import (
"context"

"github.com/pingcap/check"
"github.com/pingcap/errors"
)

var _ = check.Suite(&testUnitSuite{})

type testUnitSuite struct{}

func (t *testUnitSuite) TestIsCtxCanceledProcessErr(c *check.C) {
err := NewProcessError(context.Canceled)
c.Assert(IsCtxCanceledProcessErr(err), check.IsTrue)

err = NewProcessError(errors.New("123"))
c.Assert(IsCtxCanceledProcessErr(err), check.IsFalse)
}
33 changes: 23 additions & 10 deletions dm/worker/subtask.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,18 +236,25 @@ func (st *SubTask) callCurrCancel() {
func (st *SubTask) fetchResult(pr chan pb.ProcessResult) {
defer st.wg.Done()

st.RLock()
ctx := st.currCtx
st.RUnlock()

select {
case <-ctx.Done():
case <-st.ctx.Done():
lichunzhu marked this conversation as resolved.
Show resolved Hide resolved
// should not use st.currCtx, because will do st.currCancel when Pause task,
// and this function will return, and the unit's Process maybe still running.
return
case result := <-pr:
// filter the context canceled error
errs := make([]*pb.ProcessError, 0, 2)
for _, err := range result.Errors {
if !unit.IsCtxCanceledProcessErr(err) {
errs = append(errs, err)
}
}
result.Errors = errs

st.setResult(&result) // save result
st.callCurrCancel() // dm-unit finished, canceled or error occurred, always cancel processing

if len(result.Errors) == 0 && st.Stage() == pb.Stage_Paused {
if len(result.Errors) == 0 && st.Stage() == pb.Stage_Pausing {
return // paused by external request
}

Expand Down Expand Up @@ -371,6 +378,7 @@ func (st *SubTask) setStage(stage pb.Stage) {
func (st *SubTask) stageCAS(oldStage, newStage pb.Stage) bool {
st.Lock()
defer st.Unlock()

if st.stage == oldStage {
st.stage = newStage
taskState.WithLabelValues(st.cfg.Name).Set(float64(st.stage))
Expand Down Expand Up @@ -428,8 +436,8 @@ func (st *SubTask) Close() {

// Pause pauses the running sub task
func (st *SubTask) Pause() error {
if !st.stageCAS(pb.Stage_Running, pb.Stage_Paused) {
return terror.ErrWorkerNotRunningStage.Generate()
if !st.stageCAS(pb.Stage_Running, pb.Stage_Pausing) {
return terror.ErrWorkerNotRunningStage.Generate(st.Stage().String())
}

st.callCurrCancel()
Expand All @@ -439,6 +447,7 @@ func (st *SubTask) Pause() error {
cu.Pause()

st.l.Info("paused", zap.Stringer("unit", cu.Type()))
st.setStage(pb.Stage_Paused)
return nil
}

Expand All @@ -450,9 +459,10 @@ func (st *SubTask) Resume() error {
return nil
}

if !st.stageCAS(pb.Stage_Paused, pb.Stage_Running) {
return terror.ErrWorkerNotPausedStage.Generate()
if !st.stageCAS(pb.Stage_Paused, pb.Stage_Resuming) {
return terror.ErrWorkerNotPausedStage.Generate(st.Stage().String())
}

ctx, cancel := context.WithCancel(st.ctx)
st.setCurrCtx(ctx, cancel)
// NOTE: this may block if user resume a task
Expand All @@ -462,6 +472,7 @@ func (st *SubTask) Resume() error {
st.setStage(pb.Stage_Paused)
return err
} else if ctx.Err() != nil {
st.setStage(pb.Stage_Paused)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should not be set to pb.Stage_Paused, see #589 (comment)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, I remove this and add a comment e34ec49

return nil
}

Expand All @@ -473,6 +484,8 @@ func (st *SubTask) Resume() error {
st.wg.Add(1)
go st.fetchResult(pr)
go cu.Resume(ctx, pr)

st.setStage(pb.Stage_Running)
return nil
}

Expand Down
14 changes: 7 additions & 7 deletions loader/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -505,13 +505,13 @@ func (l *Loader) Process(ctx context.Context, pr chan pb.ProcessResult) {
}

isCanceled := false
if len(errs) == 0 {
select {
case <-ctx.Done():
isCanceled = true
default:
}
} else {
select {
case <-ctx.Done():
isCanceled = true
default:
}

if len(errs) != 0 {
// pause because of error occurred
l.Pause()
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/terror/error_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -981,8 +981,8 @@ var (
ErrWorkerNoStart = New(codeWorkerNoStart, ClassDMWorker, ScopeInternal, LevelHigh, "worker has not started")
ErrWorkerAlreadyClosed = New(codeWorkerAlreadyClosed, ClassDMWorker, ScopeInternal, LevelHigh, "worker already closed")
ErrWorkerAlreadyStart = New(codeWorkerAlreadyStarted, ClassDMWorker, ScopeInternal, LevelHigh, "worker already started")
ErrWorkerNotRunningStage = New(codeWorkerNotRunningStage, ClassDMWorker, ScopeInternal, LevelHigh, "current stage is not running not valid")
ErrWorkerNotPausedStage = New(codeWorkerNotPausedStage, ClassDMWorker, ScopeInternal, LevelHigh, "current stage is not paused not valid")
ErrWorkerNotRunningStage = New(codeWorkerNotRunningStage, ClassDMWorker, ScopeInternal, LevelHigh, "current stage is %s but not running, invalid")
ErrWorkerNotPausedStage = New(codeWorkerNotPausedStage, ClassDMWorker, ScopeInternal, LevelHigh, "current stage is %s but not paused, invalid")
ErrWorkerUpdateTaskStage = New(codeWorkerUpdateTaskStage, ClassDMWorker, ScopeInternal, LevelHigh, "can only update task on Paused stage, but current stage is %s")
ErrWorkerMigrateStopRelay = New(codeWorkerMigrateStopRelay, ClassDMWorker, ScopeInternal, LevelHigh, "relay unit has stopped, can not be migrated")
ErrWorkerSubTaskNotFound = New(codeWorkerSubTaskNotFound, ClassDMWorker, ScopeInternal, LevelHigh, "sub task with name %s not found")
Expand Down
14 changes: 7 additions & 7 deletions syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -551,13 +551,13 @@ func (s *Syncer) Process(ctx context.Context, pr chan pb.ProcessResult) {
}

isCanceled := false
if len(errs) == 0 {
select {
case <-ctx.Done():
isCanceled = true
default:
}
} else {
select {
case <-ctx.Done():
isCanceled = true
default:
}

if len(errs) != 0 {
// pause because of error occurred
s.Pause()
}
Expand Down
13 changes: 11 additions & 2 deletions tests/sharding/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,21 @@ function run() {
# `FIXME: the following case is not supported automatically now, try to support it later`
# so we try to do this `pause-task` and `resume-task` in the case now.
sleep 3
run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"pause-task test"\
"\"result\": true" 3
# pause twice
run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"pause-task test"\
"\"result\": true" 3
# wait really paused
# FIXME: `if !st.stageCAS(pb.Stage_Running, pb.Stage_Paused)` in `subtask.go` is not enough to indicate the real stage.
sleep 2
run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status test" \
"Paused" 2
run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"resume-task test"\
"\"result\": true" 3
# resume twice
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add a test to test stop-task twice?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add in ecd45e6

run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"resume-task test"\
"\"result\": true" 3
Expand Down