-
Notifications
You must be signed in to change notification settings - Fork 5.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
disttask: fix failed step is taken as success #49971
Changes from 5 commits
0f3e1ce
7f4294b
ef92320
0948aad
2cab49f
79f6153
2948acb
6d9a55a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -59,7 +59,7 @@ type TaskManager interface { | |||||
// we only consider pending/running subtasks, subtasks related to revert are | ||||||
// not considered. | ||||||
GetUsedSlotsOnNodes(ctx context.Context) (map[string]int, error) | ||||||
GetSubtaskInStatesCnt(ctx context.Context, taskID int64, states ...proto.SubtaskState) (int64, error) | ||||||
GetSubtaskCntByStates(ctx context.Context, taskID int64, step proto.Step) (map[proto.SubtaskState]int64, error) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Indeed exist confusion
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. GetSubtaskCntGroupByStates? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||||||
ResumeSubtasks(ctx context.Context, taskID int64) error | ||||||
CollectSubTaskError(ctx context.Context, taskID int64) ([]error, error) | ||||||
TransferSubTasks2History(ctx context.Context, taskID int64) error | ||||||
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -240,12 +240,13 @@ func (s *BaseScheduler) onCancelling() error { | |||||
// handle task in pausing state, cancel all running subtasks. | ||||||
func (s *BaseScheduler) onPausing() error { | ||||||
logutil.Logger(s.logCtx).Info("on pausing state", zap.Stringer("state", s.Task.State), zap.Int64("step", int64(s.Task.Step))) | ||||||
cnt, err := s.taskMgr.GetSubtaskInStatesCnt(s.ctx, s.Task.ID, proto.SubtaskStateRunning, proto.SubtaskStatePending) | ||||||
cntByStates, err := s.taskMgr.GetSubtaskCntByStates(s.ctx, s.Task.ID, s.Task.Step) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
if err != nil { | ||||||
logutil.Logger(s.logCtx).Warn("check task failed", zap.Error(err)) | ||||||
return err | ||||||
} | ||||||
if cnt == 0 { | ||||||
runningPendingCnt := cntByStates[proto.SubtaskStateRunning] + cntByStates[proto.SubtaskStatePending] | ||||||
if runningPendingCnt == 0 { | ||||||
logutil.Logger(s.logCtx).Info("all running subtasks paused, update the task to paused state") | ||||||
return s.updateTask(proto.TaskStatePaused, nil, RetrySQLTimes) | ||||||
} | ||||||
|
@@ -273,12 +274,12 @@ var TestSyncChan = make(chan struct{}) | |||||
// handle task in resuming state. | ||||||
func (s *BaseScheduler) onResuming() error { | ||||||
logutil.Logger(s.logCtx).Info("on resuming state", zap.Stringer("state", s.Task.State), zap.Int64("step", int64(s.Task.Step))) | ||||||
cnt, err := s.taskMgr.GetSubtaskInStatesCnt(s.ctx, s.Task.ID, proto.SubtaskStatePaused) | ||||||
cntByStates, err := s.taskMgr.GetSubtaskCntByStates(s.ctx, s.Task.ID, s.Task.Step) | ||||||
if err != nil { | ||||||
logutil.Logger(s.logCtx).Warn("check task failed", zap.Error(err)) | ||||||
return err | ||||||
} | ||||||
if cnt == 0 { | ||||||
if cntByStates[proto.SubtaskStatePaused] == 0 { | ||||||
// Finish the resuming process. | ||||||
logutil.Logger(s.logCtx).Info("all paused tasks converted to pending state, update the task to running state") | ||||||
err := s.updateTask(proto.TaskStateRunning, nil, RetrySQLTimes) | ||||||
|
@@ -294,12 +295,13 @@ func (s *BaseScheduler) onResuming() error { | |||||
// handle task in reverting state, check all revert subtasks finishes. | ||||||
func (s *BaseScheduler) onReverting() error { | ||||||
logutil.Logger(s.logCtx).Debug("on reverting state", zap.Stringer("state", s.Task.State), zap.Int64("step", int64(s.Task.Step))) | ||||||
cnt, err := s.taskMgr.GetSubtaskInStatesCnt(s.ctx, s.Task.ID, proto.SubtaskStateRevertPending, proto.SubtaskStateReverting) | ||||||
cntByStates, err := s.taskMgr.GetSubtaskCntByStates(s.ctx, s.Task.ID, s.Task.Step) | ||||||
if err != nil { | ||||||
logutil.Logger(s.logCtx).Warn("check task failed", zap.Error(err)) | ||||||
return err | ||||||
} | ||||||
if cnt == 0 { | ||||||
activeRevertCnt := cntByStates[proto.SubtaskStateRevertPending] + cntByStates[proto.SubtaskStateReverting] | ||||||
if activeRevertCnt == 0 { | ||||||
if err = s.OnDone(s.ctx, s, s.Task); err != nil { | ||||||
return errors.Trace(err) | ||||||
} | ||||||
|
@@ -323,23 +325,23 @@ func (s *BaseScheduler) onRunning() error { | |||||
logutil.Logger(s.logCtx).Debug("on running state", | ||||||
zap.Stringer("state", s.Task.State), | ||||||
zap.Int64("step", int64(s.Task.Step))) | ||||||
subTaskErrs, err := s.taskMgr.CollectSubTaskError(s.ctx, s.Task.ID) | ||||||
if err != nil { | ||||||
logutil.Logger(s.logCtx).Warn("collect subtask error failed", zap.Error(err)) | ||||||
return err | ||||||
} | ||||||
if len(subTaskErrs) > 0 { | ||||||
logutil.Logger(s.logCtx).Warn("subtasks encounter errors") | ||||||
return s.onErrHandlingStage(subTaskErrs) | ||||||
} | ||||||
// check current step finishes. | ||||||
cnt, err := s.taskMgr.GetSubtaskInStatesCnt(s.ctx, s.Task.ID, proto.SubtaskStatePending, proto.SubtaskStateRunning) | ||||||
cntByStates, err := s.taskMgr.GetSubtaskCntByStates(s.ctx, s.Task.ID, s.Task.Step) | ||||||
if err != nil { | ||||||
logutil.Logger(s.logCtx).Warn("check task failed", zap.Error(err)) | ||||||
return err | ||||||
} | ||||||
|
||||||
if cnt == 0 { | ||||||
if cntByStates[proto.SubtaskStateFailed] > 0 || cntByStates[proto.SubtaskStateCanceled] > 0 { | ||||||
subTaskErrs, err := s.taskMgr.CollectSubTaskError(s.ctx, s.Task.ID) | ||||||
if err != nil { | ||||||
logutil.Logger(s.logCtx).Warn("collect subtask error failed", zap.Error(err)) | ||||||
return err | ||||||
} | ||||||
if len(subTaskErrs) > 0 { | ||||||
logutil.Logger(s.logCtx).Warn("subtasks encounter errors") | ||||||
return s.onErrHandlingStage(subTaskErrs) | ||||||
} | ||||||
} else if s.isStepSucceed(cntByStates) { | ||||||
return s.switch2NextStep() | ||||||
} | ||||||
|
||||||
|
@@ -727,6 +729,11 @@ func (s *BaseScheduler) WithNewTxn(ctx context.Context, fn func(se sessionctx.Co | |||||
return s.taskMgr.WithNewTxn(ctx, fn) | ||||||
} | ||||||
|
||||||
func (*BaseScheduler) isStepSucceed(cntByStates map[proto.SubtaskState]int64) bool { | ||||||
D3Hunter marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
_, ok := cntByStates[proto.SubtaskStateSucceed] | ||||||
return len(cntByStates) == 0 || (len(cntByStates) == 1 && ok) | ||||||
} | ||||||
|
||||||
// IsCancelledErr checks if the error is a cancelled error. | ||||||
func IsCancelledErr(err error) bool { | ||||||
return strings.Contains(err.Error(), taskCancelMsg) | ||||||
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -0,0 +1,40 @@ | ||||||
// Copyright 2023 PingCAP, Inc. | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
// | ||||||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||||||
// you may not use this file except in compliance with the License. | ||||||
// You may obtain a copy of the License at | ||||||
// | ||||||
// http://www.apache.org/licenses/LICENSE-2.0 | ||||||
// | ||||||
// Unless required by applicable law or agreed to in writing, software | ||||||
// distributed under the License is distributed on an "AS IS" BASIS, | ||||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||||
// See the License for the specific language governing permissions and | ||||||
// limitations under the License. | ||||||
|
||||||
package scheduler | ||||||
|
||||||
import ( | ||||||
"testing" | ||||||
|
||||||
"github.com/pingcap/tidb/pkg/disttask/framework/proto" | ||||||
"github.com/stretchr/testify/require" | ||||||
) | ||||||
|
||||||
func TestSchedulerIsStepSucceed(t *testing.T) { | ||||||
s := &BaseScheduler{} | ||||||
require.True(t, s.isStepSucceed(nil)) | ||||||
require.True(t, s.isStepSucceed(map[proto.SubtaskState]int64{})) | ||||||
require.True(t, s.isStepSucceed(map[proto.SubtaskState]int64{ | ||||||
proto.SubtaskStateSucceed: 1, | ||||||
})) | ||||||
for _, state := range []proto.SubtaskState{ | ||||||
proto.SubtaskStateCanceled, | ||||||
proto.SubtaskStateFailed, | ||||||
proto.SubtaskStateReverting, | ||||||
} { | ||||||
require.False(t, s.isStepSucceed(map[proto.SubtaskState]int64{ | ||||||
state: 1, | ||||||
})) | ||||||
} | ||||||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also in planner_test