Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

.*: fix bug that after execute pause-task the task may still running #644

Merged
merged 10 commits into from
Apr 30, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions _utils/terror_gen/errors_release.txt
Original file line number Diff line number Diff line change
Expand Up @@ -390,8 +390,8 @@ ErrWorkerStartService,[code=40048:class=dm-worker:scope=internal:level=high],"st
ErrWorkerNoStart,[code=40070:class=dm-worker:scope=internal:level=high],"worker has not started"
ErrWorkerAlreadyClosed,[code=40049:class=dm-worker:scope=internal:level=high],"worker already closed"
ErrWorkerAlreadyStart,[code=40071:class=dm-worker:scope=internal:level=high],"worker already started"
ErrWorkerNotRunningStage,[code=40050:class=dm-worker:scope=internal:level=high],"current stage is not running not valid"
ErrWorkerNotPausedStage,[code=40051:class=dm-worker:scope=internal:level=high],"current stage is not paused not valid"
ErrWorkerNotRunningStage,[code=40050:class=dm-worker:scope=internal:level=high],"current stage is %s but not running, invalid"
ErrWorkerNotPausedStage,[code=40051:class=dm-worker:scope=internal:level=high],"current stage is %s but not paused, invalid"
ErrWorkerUpdateTaskStage,[code=40052:class=dm-worker:scope=internal:level=high],"can only update task on Paused stage, but current stage is %s"
ErrWorkerMigrateStopRelay,[code=40053:class=dm-worker:scope=internal:level=high],"relay unit has stopped, can not be migrated"
ErrWorkerSubTaskNotFound,[code=40054:class=dm-worker:scope=internal:level=high],"sub task with name %s not found"
Expand Down
261 changes: 134 additions & 127 deletions dm/pb/dmworker.pb.go

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions dm/proto/dmworker.proto
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,9 @@ enum Stage {
Paused = 3;
Stopped = 4;
Finished = 5;

Pausing = 6;
Resuming = 7;
}

// CheckStatus represents status for check unit
Expand Down
9 changes: 9 additions & 0 deletions dm/unit/unit.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package unit

import (
"context"
"strings"
"time"

"github.com/pingcap/dm/dm/config"
Expand Down Expand Up @@ -77,3 +78,11 @@ func NewProcessError(err error) *pb.ProcessError {
}
return result
}

// IsCtxCanceledProcessErr returns true if the err's context canceled
func IsCtxCanceledProcessErr(err *pb.ProcessError) bool {
if strings.Contains(err.Msg, "context canceled") {
return true
}
return false
}
33 changes: 33 additions & 0 deletions dm/unit/unit_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright 2020 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package unit

import (
"context"

"github.com/pingcap/check"
"github.com/pingcap/errors"
)

var _ = check.Suite(&testUnitSuite{})

type testUnitSuite struct{}

func (t *testUnitSuite) TestIsCtxCanceledProcessErr(c *check.C) {
err := NewProcessError(context.Canceled)
c.Assert(IsCtxCanceledProcessErr(err), check.IsTrue)

err = NewProcessError(errors.New("123"))
c.Assert(IsCtxCanceledProcessErr(err), check.IsFalse)
}
34 changes: 24 additions & 10 deletions dm/worker/subtask.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,18 +236,25 @@ func (st *SubTask) callCurrCancel() {
func (st *SubTask) fetchResult(pr chan pb.ProcessResult) {
defer st.wg.Done()

st.RLock()
ctx := st.currCtx
st.RUnlock()

select {
case <-ctx.Done():
case <-st.ctx.Done():
lichunzhu marked this conversation as resolved.
Show resolved Hide resolved
// should not use st.currCtx, because will do st.currCancel when Pause task,
// and this function will return, and the unit's Process maybe still running.
return
case result := <-pr:
// filter the context canceled error
errs := make([]*pb.ProcessError, 0, 2)
for _, err := range result.Errors {
if !unit.IsCtxCanceledProcessErr(err) {
errs = append(errs, err)
}
}
result.Errors = errs

st.setResult(&result) // save result
st.callCurrCancel() // dm-unit finished, canceled or error occurred, always cancel processing

if len(result.Errors) == 0 && st.Stage() == pb.Stage_Paused {
if len(result.Errors) == 0 && st.Stage() == pb.Stage_Pausing {
return // paused by external request
}

Expand Down Expand Up @@ -371,6 +378,7 @@ func (st *SubTask) setStage(stage pb.Stage) {
func (st *SubTask) stageCAS(oldStage, newStage pb.Stage) bool {
st.Lock()
defer st.Unlock()

if st.stage == oldStage {
st.stage = newStage
taskState.WithLabelValues(st.cfg.Name).Set(float64(st.stage))
Expand Down Expand Up @@ -428,8 +436,8 @@ func (st *SubTask) Close() {

// Pause pauses the running sub task
func (st *SubTask) Pause() error {
if !st.stageCAS(pb.Stage_Running, pb.Stage_Paused) {
return terror.ErrWorkerNotRunningStage.Generate()
if !st.stageCAS(pb.Stage_Running, pb.Stage_Pausing) {
return terror.ErrWorkerNotRunningStage.Generate(st.Stage().String())
}

st.callCurrCancel()
Expand All @@ -439,6 +447,7 @@ func (st *SubTask) Pause() error {
cu.Pause()

st.l.Info("paused", zap.Stringer("unit", cu.Type()))
st.setStage(pb.Stage_Paused)
return nil
}

Expand All @@ -450,9 +459,10 @@ func (st *SubTask) Resume() error {
return nil
}

if !st.stageCAS(pb.Stage_Paused, pb.Stage_Running) {
return terror.ErrWorkerNotPausedStage.Generate()
if !st.stageCAS(pb.Stage_Paused, pb.Stage_Resuming) {
return terror.ErrWorkerNotPausedStage.Generate(st.Stage().String())
}

ctx, cancel := context.WithCancel(st.ctx)
st.setCurrCtx(ctx, cancel)
// NOTE: this may block if user resume a task
Expand All @@ -462,6 +472,8 @@ func (st *SubTask) Resume() error {
st.setStage(pb.Stage_Paused)
return err
} else if ctx.Err() != nil {
// ctx.Err() != nil means this context is canceled in other go routine,
// that go routine will change the stage, so don't need to set stage to paused here.
return nil
}

Expand All @@ -473,6 +485,8 @@ func (st *SubTask) Resume() error {
st.wg.Add(1)
go st.fetchResult(pr)
go cu.Resume(ctx, pr)

st.setStage(pb.Stage_Running)
return nil
}

Expand Down
14 changes: 7 additions & 7 deletions loader/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -505,13 +505,13 @@ func (l *Loader) Process(ctx context.Context, pr chan pb.ProcessResult) {
}

isCanceled := false
if len(errs) == 0 {
select {
case <-ctx.Done():
isCanceled = true
default:
}
} else {
select {
case <-ctx.Done():
isCanceled = true
default:
}

if len(errs) != 0 {
// pause because of error occurred
l.Pause()
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/terror/error_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -981,8 +981,8 @@ var (
ErrWorkerNoStart = New(codeWorkerNoStart, ClassDMWorker, ScopeInternal, LevelHigh, "worker has not started")
ErrWorkerAlreadyClosed = New(codeWorkerAlreadyClosed, ClassDMWorker, ScopeInternal, LevelHigh, "worker already closed")
ErrWorkerAlreadyStart = New(codeWorkerAlreadyStarted, ClassDMWorker, ScopeInternal, LevelHigh, "worker already started")
ErrWorkerNotRunningStage = New(codeWorkerNotRunningStage, ClassDMWorker, ScopeInternal, LevelHigh, "current stage is not running not valid")
ErrWorkerNotPausedStage = New(codeWorkerNotPausedStage, ClassDMWorker, ScopeInternal, LevelHigh, "current stage is not paused not valid")
ErrWorkerNotRunningStage = New(codeWorkerNotRunningStage, ClassDMWorker, ScopeInternal, LevelHigh, "current stage is %s but not running, invalid")
ErrWorkerNotPausedStage = New(codeWorkerNotPausedStage, ClassDMWorker, ScopeInternal, LevelHigh, "current stage is %s but not paused, invalid")
ErrWorkerUpdateTaskStage = New(codeWorkerUpdateTaskStage, ClassDMWorker, ScopeInternal, LevelHigh, "can only update task on Paused stage, but current stage is %s")
ErrWorkerMigrateStopRelay = New(codeWorkerMigrateStopRelay, ClassDMWorker, ScopeInternal, LevelHigh, "relay unit has stopped, can not be migrated")
ErrWorkerSubTaskNotFound = New(codeWorkerSubTaskNotFound, ClassDMWorker, ScopeInternal, LevelHigh, "sub task with name %s not found")
Expand Down
14 changes: 7 additions & 7 deletions syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -550,13 +550,13 @@ func (s *Syncer) Process(ctx context.Context, pr chan pb.ProcessResult) {
}

isCanceled := false
if len(errs) == 0 {
select {
case <-ctx.Done():
isCanceled = true
default:
}
} else {
select {
case <-ctx.Done():
isCanceled = true
default:
}

if len(errs) != 0 {
// pause because of error occurred
s.Pause()
}
Expand Down
22 changes: 20 additions & 2 deletions tests/sharding/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,22 @@ function run() {
# `FIXME: the following case is not supported automatically now, try to support it later`
# so we try to do this `pause-task` and `resume-task` in the case now.
sleep 3
# pause twice, just used to test pause by the way
run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"pause-task test"\
"\"result\": true" 3
run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"pause-task test"\
"\"result\": true" 3
# wait really paused
# FIXME: `if !st.stageCAS(pb.Stage_Running, pb.Stage_Paused)` in `subtask.go` is not enough to indicate the real stage.
sleep 2
run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status test" \
"Paused" 2

# resume twice, just used to test resume by the way
run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"resume-task test"\
"\"result\": true" 3
run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"resume-task test"\
"\"result\": true" 3
Expand Down Expand Up @@ -128,6 +138,14 @@ function run() {
new_checksum=$(checksum)
echo "checksum before drop/truncate: $old_checksum, checksum after drop/truncate: $new_checksum"
[ "$old_checksum" == "$new_checksum" ]

# stop twice, just used to test stop by the way
run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"stop-task test"\
"\"result\": true" 3
run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"stop-task test"\
"task test has no source or not exist" 1
}

cleanup_data db_target
Expand Down