diff --git a/pkg/disttask/framework/integrationtests/framework_err_handling_test.go b/pkg/disttask/framework/integrationtests/framework_err_handling_test.go index ad42af609120e..de6dc30b5154e 100644 --- a/pkg/disttask/framework/integrationtests/framework_err_handling_test.go +++ b/pkg/disttask/framework/integrationtests/framework_err_handling_test.go @@ -33,5 +33,8 @@ func TestPlanNotRetryableOnNextSubtasksBatchErr(t *testing.T) { testutil.RegisterTaskMeta(t, c.MockCtrl, testutil.GetPlanNotRetryableErrSchedulerExt(c.MockCtrl), c.TestContext, nil) task := testutil.SubmitAndWaitTask(c.Ctx, t, "key1", 1) - require.Equal(t, proto.TaskStateFailed, task.State) + require.Equal(t, proto.TaskStateReverted, task.State) + testutil.RegisterTaskMeta(t, c.MockCtrl, testutil.GetStepTwoPlanNotRetryableErrSchedulerExt(c.MockCtrl), c.TestContext, nil) + task = testutil.SubmitAndWaitTask(c.Ctx, t, "key2", 1) + require.Equal(t, proto.TaskStateReverted, task.State) } diff --git a/pkg/disttask/framework/scheduler/scheduler.go b/pkg/disttask/framework/scheduler/scheduler.go index aac05536bd04f..1d252302a7cd2 100644 --- a/pkg/disttask/framework/scheduler/scheduler.go +++ b/pkg/disttask/framework/scheduler/scheduler.go @@ -479,13 +479,7 @@ func (s *BaseScheduler) handlePlanErr(err error) error { } task.Error = err s.task.Store(&task) - - if err = s.OnDone(s.ctx, s, &task); err != nil { - return errors.Trace(err) - } - - // TODO: to reverting state? - return s.taskMgr.FailTask(s.ctx, task.ID, task.State, task.Error) + return s.taskMgr.RevertTask(s.ctx, task.ID, task.State, task.Error) } // MockServerInfo exported for scheduler_test.go diff --git a/pkg/disttask/framework/taskexecutor/manager.go b/pkg/disttask/framework/taskexecutor/manager.go index 4914781291989..b72b0e7fef2a9 100644 --- a/pkg/disttask/framework/taskexecutor/manager.go +++ b/pkg/disttask/framework/taskexecutor/manager.go @@ -17,7 +17,6 @@ package taskexecutor import ( "context" "sync" - "sync/atomic" "time" "github.com/docker/go-units" @@ -306,12 +305,6 @@ func (m *Manager) cancelTaskExecutors(tasks []*proto.TaskBase) { } } -// TestContext only used in tests. -type TestContext struct { - TestSyncSubtaskRun chan struct{} - mockDown atomic.Bool -} - // startTaskExecutor handles a runnable task. func (m *Manager) startTaskExecutor(task *proto.Task) { // runCtx only used in executor.Run, cancel in m.fetchAndFastCancelTasks. diff --git a/pkg/disttask/framework/testutil/scheduler_util.go b/pkg/disttask/framework/testutil/scheduler_util.go index b00f45e992d40..628e039453805 100644 --- a/pkg/disttask/framework/testutil/scheduler_util.go +++ b/pkg/disttask/framework/testutil/scheduler_util.go @@ -125,6 +125,17 @@ func GetPlanNotRetryableErrSchedulerExt(ctrl *gomock.Controller) scheduler.Exten }) } +// GetStepTwoPlanNotRetryableErrSchedulerExt returns mock scheduler.Extension which will generate non retryable error when planning for step two. +func GetStepTwoPlanNotRetryableErrSchedulerExt(ctrl *gomock.Controller) scheduler.Extension { + return GetMockSchedulerExt(ctrl, SchedulerInfo{ + AllErrorRetryable: false, + StepInfos: []StepInfo{ + {Step: proto.StepOne, SubtaskCnt: 10}, + {Step: proto.StepTwo, Err: errors.New("not retryable err"), ErrRepeatCount: math.MaxInt64}, + }, + }) +} + // GetPlanErrSchedulerExt returns mock scheduler.Extension which will generate error when planning. func GetPlanErrSchedulerExt(ctrl *gomock.Controller, testContext *TestContext) scheduler.Extension { mockScheduler := mockDispatch.NewMockExtension(ctrl) diff --git a/tests/realtikvtest/importintotest4/global_sort_test.go b/tests/realtikvtest/importintotest4/global_sort_test.go index 32f4da62c02ae..6427e7a563e0d 100644 --- a/tests/realtikvtest/importintotest4/global_sort_test.go +++ b/tests/realtikvtest/importintotest4/global_sort_test.go @@ -25,6 +25,7 @@ import ( "time" "github.com/fsouza/fake-gcs-server/fakestorage" + "github.com/pingcap/tidb/pkg/disttask/framework/proto" "github.com/pingcap/tidb/pkg/disttask/framework/scheduler" "github.com/pingcap/tidb/pkg/disttask/framework/storage" "github.com/pingcap/tidb/pkg/disttask/importinto" @@ -117,7 +118,7 @@ func (s *mockGCSSuite) TestGlobalSortBasic() { s.Eventually(func() bool { task, err2 = taskManager.GetTaskByKeyWithHistory(ctx, importinto.TaskKey(int64(jobID))) s.NoError(err2) - return task.State == "failed" + return task.State == proto.TaskStateReverted }, 30*time.Second, 300*time.Millisecond) // check all sorted data cleaned up <-scheduler.WaitCleanUpFinished