Skip to content

Commit

Permalink
disttask: fix failed step is taken as success (#49971)
Browse files Browse the repository at this point in the history
close #49950
  • Loading branch information
D3Hunter authored Jan 2, 2024
1 parent ca816f8 commit 99f0349
Show file tree
Hide file tree
Showing 18 changed files with 178 additions and 92 deletions.
4 changes: 2 additions & 2 deletions pkg/disttask/framework/framework_pause_and_resume_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@ func CheckSubtasksState(ctx context.Context, t *testing.T, taskID int64, state p
mgr, err := storage.GetTaskManager()
require.NoError(t, err)
mgr.PrintSubtaskInfo(ctx, taskID)
cnt, err := mgr.GetSubtaskInStatesCnt(ctx, taskID, state)
cntByStates, err := mgr.GetSubtaskCntGroupByStates(ctx, taskID, proto.StepTwo)
require.NoError(t, err)
historySubTasksCnt, err := storage.GetSubtasksFromHistoryByTaskIDForTest(ctx, mgr, taskID)
require.NoError(t, err)
require.Equal(t, expectedCnt, cnt+int64(historySubTasksCnt))
require.Equal(t, expectedCnt, cntByStates[state]+int64(historySubTasksCnt))
}

func TestFrameworkPauseAndResume(t *testing.T) {
Expand Down
1 change: 1 addition & 0 deletions pkg/disttask/framework/handle/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ go_test(
":handle",
"//pkg/disttask/framework/proto",
"//pkg/disttask/framework/storage",
"//pkg/disttask/framework/testutil",
"//pkg/testkit",
"//pkg/util/backoff",
"@com_github_ngaut_pools//:pools",
Expand Down
3 changes: 3 additions & 0 deletions pkg/disttask/framework/handle/handle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/pingcap/tidb/pkg/disttask/framework/handle"
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
"github.com/pingcap/tidb/pkg/disttask/framework/storage"
"github.com/pingcap/tidb/pkg/disttask/framework/testutil"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/pingcap/tidb/pkg/util/backoff"
"github.com/stretchr/testify/require"
Expand All @@ -52,6 +53,8 @@ func TestHandle(t *testing.T) {
mgr := storage.NewTaskManager(pool)
storage.SetTaskManager(mgr)

testutil.WaitNodeRegistered(ctx, t)

// no scheduler registered
task, err := handle.SubmitTask(ctx, "1", proto.TaskTypeExample, 2, []byte("byte"))
require.NoError(t, err)
Expand Down
19 changes: 7 additions & 12 deletions pkg/disttask/framework/mock/scheduler_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pkg/disttask/framework/planner/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ go_test(
":planner",
"//pkg/disttask/framework/mock",
"//pkg/disttask/framework/storage",
"//pkg/disttask/framework/testutil",
"//pkg/kv",
"//pkg/testkit",
"@com_github_ngaut_pools//:pools",
Expand Down
3 changes: 2 additions & 1 deletion pkg/disttask/framework/planner/planner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/pingcap/tidb/pkg/disttask/framework/mock"
"github.com/pingcap/tidb/pkg/disttask/framework/planner"
"github.com/pingcap/tidb/pkg/disttask/framework/storage"
"github.com/pingcap/tidb/pkg/disttask/framework/testutil"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/stretchr/testify/require"
Expand All @@ -45,7 +46,7 @@ func TestPlanner(t *testing.T) {
defer pool.Close()
mgr := storage.NewTaskManager(pool)
storage.SetTaskManager(mgr)

testutil.WaitNodeRegistered(ctx, t)
p := &planner.Planner{}
pCtx := planner.PlanCtx{
Ctx: ctx,
Expand Down
3 changes: 2 additions & 1 deletion pkg/disttask/framework/scheduler/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,14 @@ go_test(
"nodes_test.go",
"rebalance_test.go",
"scheduler_manager_test.go",
"scheduler_nokit_test.go",
"scheduler_test.go",
"slots_test.go",
],
embed = [":scheduler"],
flaky = True,
race = "off",
shard_count = 25,
shard_count = 26,
deps = [
"//pkg/config",
"//pkg/disttask/framework/mock",
Expand Down
3 changes: 2 additions & 1 deletion pkg/disttask/framework/scheduler/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ type TaskManager interface {
// we only consider pending/running subtasks, subtasks related to revert are
// not considered.
GetUsedSlotsOnNodes(ctx context.Context) (map[string]int, error)
GetSubtaskInStatesCnt(ctx context.Context, taskID int64, states ...proto.SubtaskState) (int64, error)
// GetSubtaskCntGroupByStates returns the count of subtasks of some step group by state.
GetSubtaskCntGroupByStates(ctx context.Context, taskID int64, step proto.Step) (map[proto.SubtaskState]int64, error)
ResumeSubtasks(ctx context.Context, taskID int64) error
CollectSubTaskError(ctx context.Context, taskID int64) ([]error, error)
TransferSubTasks2History(ctx context.Context, taskID int64) error
Expand Down
43 changes: 25 additions & 18 deletions pkg/disttask/framework/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,12 +240,13 @@ func (s *BaseScheduler) onCancelling() error {
// handle task in pausing state, cancel all running subtasks.
func (s *BaseScheduler) onPausing() error {
logutil.Logger(s.logCtx).Info("on pausing state", zap.Stringer("state", s.Task.State), zap.Int64("step", int64(s.Task.Step)))
cnt, err := s.taskMgr.GetSubtaskInStatesCnt(s.ctx, s.Task.ID, proto.SubtaskStateRunning, proto.SubtaskStatePending)
cntByStates, err := s.taskMgr.GetSubtaskCntGroupByStates(s.ctx, s.Task.ID, s.Task.Step)
if err != nil {
logutil.Logger(s.logCtx).Warn("check task failed", zap.Error(err))
return err
}
if cnt == 0 {
runningPendingCnt := cntByStates[proto.SubtaskStateRunning] + cntByStates[proto.SubtaskStatePending]
if runningPendingCnt == 0 {
logutil.Logger(s.logCtx).Info("all running subtasks paused, update the task to paused state")
return s.updateTask(proto.TaskStatePaused, nil, RetrySQLTimes)
}
Expand Down Expand Up @@ -273,12 +274,12 @@ var TestSyncChan = make(chan struct{})
// handle task in resuming state.
func (s *BaseScheduler) onResuming() error {
logutil.Logger(s.logCtx).Info("on resuming state", zap.Stringer("state", s.Task.State), zap.Int64("step", int64(s.Task.Step)))
cnt, err := s.taskMgr.GetSubtaskInStatesCnt(s.ctx, s.Task.ID, proto.SubtaskStatePaused)
cntByStates, err := s.taskMgr.GetSubtaskCntGroupByStates(s.ctx, s.Task.ID, s.Task.Step)
if err != nil {
logutil.Logger(s.logCtx).Warn("check task failed", zap.Error(err))
return err
}
if cnt == 0 {
if cntByStates[proto.SubtaskStatePaused] == 0 {
// Finish the resuming process.
logutil.Logger(s.logCtx).Info("all paused tasks converted to pending state, update the task to running state")
err := s.updateTask(proto.TaskStateRunning, nil, RetrySQLTimes)
Expand All @@ -294,12 +295,13 @@ func (s *BaseScheduler) onResuming() error {
// handle task in reverting state, check all revert subtasks finishes.
func (s *BaseScheduler) onReverting() error {
logutil.Logger(s.logCtx).Debug("on reverting state", zap.Stringer("state", s.Task.State), zap.Int64("step", int64(s.Task.Step)))
cnt, err := s.taskMgr.GetSubtaskInStatesCnt(s.ctx, s.Task.ID, proto.SubtaskStateRevertPending, proto.SubtaskStateReverting)
cntByStates, err := s.taskMgr.GetSubtaskCntGroupByStates(s.ctx, s.Task.ID, s.Task.Step)
if err != nil {
logutil.Logger(s.logCtx).Warn("check task failed", zap.Error(err))
return err
}
if cnt == 0 {
activeRevertCnt := cntByStates[proto.SubtaskStateRevertPending] + cntByStates[proto.SubtaskStateReverting]
if activeRevertCnt == 0 {
if err = s.OnDone(s.ctx, s, s.Task); err != nil {
return errors.Trace(err)
}
Expand All @@ -323,23 +325,23 @@ func (s *BaseScheduler) onRunning() error {
logutil.Logger(s.logCtx).Debug("on running state",
zap.Stringer("state", s.Task.State),
zap.Int64("step", int64(s.Task.Step)))
subTaskErrs, err := s.taskMgr.CollectSubTaskError(s.ctx, s.Task.ID)
if err != nil {
logutil.Logger(s.logCtx).Warn("collect subtask error failed", zap.Error(err))
return err
}
if len(subTaskErrs) > 0 {
logutil.Logger(s.logCtx).Warn("subtasks encounter errors")
return s.onErrHandlingStage(subTaskErrs)
}
// check current step finishes.
cnt, err := s.taskMgr.GetSubtaskInStatesCnt(s.ctx, s.Task.ID, proto.SubtaskStatePending, proto.SubtaskStateRunning)
cntByStates, err := s.taskMgr.GetSubtaskCntGroupByStates(s.ctx, s.Task.ID, s.Task.Step)
if err != nil {
logutil.Logger(s.logCtx).Warn("check task failed", zap.Error(err))
return err
}

if cnt == 0 {
if cntByStates[proto.SubtaskStateFailed] > 0 || cntByStates[proto.SubtaskStateCanceled] > 0 {
subTaskErrs, err := s.taskMgr.CollectSubTaskError(s.ctx, s.Task.ID)
if err != nil {
logutil.Logger(s.logCtx).Warn("collect subtask error failed", zap.Error(err))
return err
}
if len(subTaskErrs) > 0 {
logutil.Logger(s.logCtx).Warn("subtasks encounter errors")
return s.onErrHandlingStage(subTaskErrs)
}
} else if s.isStepSucceed(cntByStates) {
return s.switch2NextStep()
}

Expand Down Expand Up @@ -727,6 +729,11 @@ func (s *BaseScheduler) WithNewTxn(ctx context.Context, fn func(se sessionctx.Co
return s.taskMgr.WithNewTxn(ctx, fn)
}

func (*BaseScheduler) isStepSucceed(cntByStates map[proto.SubtaskState]int64) bool {
_, ok := cntByStates[proto.SubtaskStateSucceed]
return len(cntByStates) == 0 || (len(cntByStates) == 1 && ok)
}

// IsCancelledErr checks if the error is a cancelled error.
func IsCancelledErr(err error) bool {
return strings.Contains(err.Error(), taskCancelMsg)
Expand Down
7 changes: 4 additions & 3 deletions pkg/disttask/framework/scheduler/scheduler_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/ngaut/pools"
"github.com/pingcap/tidb/pkg/disttask/framework/mock"
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
"github.com/pingcap/tidb/pkg/disttask/framework/testutil"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/util"
Expand All @@ -43,9 +44,9 @@ func TestCleanUpRoutine(t *testing.T) {

sch, mgr := MockSchedulerManager(t, ctrl, pool, getNumberExampleSchedulerExt(ctrl), mockCleanupRoutine)
mockCleanupRoutine.EXPECT().CleanUp(gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
require.NoError(t, mgr.StartManager(ctx, ":4000", ""))
sch.Start()
defer sch.Stop()
testutil.WaitNodeRegistered(ctx, t)
taskID, err := mgr.CreateTask(ctx, "test", proto.TaskTypeExample, 1, nil)
require.NoError(t, err)

Expand All @@ -62,9 +63,9 @@ func TestCleanUpRoutine(t *testing.T) {

checkSubtaskCnt := func(tasks []*proto.Task, taskID int64) {
require.Eventually(t, func() bool {
cnt, err := mgr.GetSubtaskInStatesCnt(ctx, taskID, proto.SubtaskStatePending)
cntByStates, err := mgr.GetSubtaskCntGroupByStates(ctx, taskID, proto.StepOne)
require.NoError(t, err)
return int64(subtaskCnt) == cnt
return int64(subtaskCnt) == cntByStates[proto.SubtaskStatePending]
}, time.Second, 50*time.Millisecond)
}

Expand Down
40 changes: 40 additions & 0 deletions pkg/disttask/framework/scheduler/scheduler_nokit_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package scheduler

import (
"testing"

"github.com/pingcap/tidb/pkg/disttask/framework/proto"
"github.com/stretchr/testify/require"
)

func TestSchedulerIsStepSucceed(t *testing.T) {
s := &BaseScheduler{}
require.True(t, s.isStepSucceed(nil))
require.True(t, s.isStepSucceed(map[proto.SubtaskState]int64{}))
require.True(t, s.isStepSucceed(map[proto.SubtaskState]int64{
proto.SubtaskStateSucceed: 1,
}))
for _, state := range []proto.SubtaskState{
proto.SubtaskStateCanceled,
proto.SubtaskStateFailed,
proto.SubtaskStateReverting,
} {
require.False(t, s.isStepSucceed(map[proto.SubtaskState]int64{
state: 1,
}))
}
}
8 changes: 5 additions & 3 deletions pkg/disttask/framework/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,8 @@ func TestTaskFailInManager(t *testing.T) {
schManager.Start()
defer schManager.Stop()

testutil.WaitNodeRegistered(ctx, t)

// unknown task type
taskID, err := mgr.CreateTask(ctx, "test", "test-type", 1, nil)
require.NoError(t, err)
Expand Down Expand Up @@ -309,9 +311,9 @@ func checkDispatch(t *testing.T, taskCnt int, isSucc, isCancel, isSubtaskCancel,
for i, taskID := range taskIDs {
require.Equal(t, int64(i+1), tasks[i].ID)
require.Eventually(t, func() bool {
cnt, err := mgr.GetSubtaskInStatesCnt(ctx, taskID, proto.SubtaskStatePending)
cntByStates, err := mgr.GetSubtaskCntGroupByStates(ctx, taskID, proto.StepOne)
require.NoError(t, err)
return int64(subtaskCnt) == cnt
return int64(subtaskCnt) == cntByStates[proto.SubtaskStatePending]
}, time.Second, 50*time.Millisecond)
}
}
Expand Down Expand Up @@ -627,7 +629,7 @@ func TestManagerDispatchLoop(t *testing.T) {
require.NoError(t, err)
for _, s := range serverInfos {
execID := disttaskutil.GenerateExecID(s)
testutil.InsertSubtask(t, taskMgr, 1000000, proto.StepOne, execID, []byte(""), proto.TaskStatePending, proto.TaskTypeExample, 16)
testutil.InsertSubtask(t, taskMgr, 1000000, proto.StepOne, execID, []byte(""), proto.SubtaskStatePending, proto.TaskTypeExample, 16)
}
concurrencies := []int{4, 6, 16, 2, 4, 4}
waitChannels := make([]chan struct{}, len(concurrencies))
Expand Down
2 changes: 1 addition & 1 deletion pkg/disttask/framework/storage/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ go_test(
embed = [":storage"],
flaky = True,
race = "on",
shard_count = 13,
shard_count = 14,
deps = [
"//pkg/config",
"//pkg/disttask/framework/proto",
Expand Down
Loading

0 comments on commit 99f0349

Please sign in to comment.