From 2bf88b2c7f5ac8dbb8b9f45b10458e8487fcca3b Mon Sep 17 00:00:00 2001 From: ywqzzy <592838129@qq.com> Date: Fri, 23 Feb 2024 14:37:44 +0800 Subject: [PATCH 1/2] refine --- .../integrationtests/framework_err_handling_test.go | 5 ++++- pkg/disttask/framework/scheduler/scheduler.go | 8 +------- pkg/disttask/framework/taskexecutor/manager.go | 7 ------- pkg/disttask/framework/testutil/scheduler_util.go | 11 +++++++++++ 4 files changed, 16 insertions(+), 15 deletions(-) diff --git a/pkg/disttask/framework/integrationtests/framework_err_handling_test.go b/pkg/disttask/framework/integrationtests/framework_err_handling_test.go index ad42af609120e..de6dc30b5154e 100644 --- a/pkg/disttask/framework/integrationtests/framework_err_handling_test.go +++ b/pkg/disttask/framework/integrationtests/framework_err_handling_test.go @@ -33,5 +33,8 @@ func TestPlanNotRetryableOnNextSubtasksBatchErr(t *testing.T) { testutil.RegisterTaskMeta(t, c.MockCtrl, testutil.GetPlanNotRetryableErrSchedulerExt(c.MockCtrl), c.TestContext, nil) task := testutil.SubmitAndWaitTask(c.Ctx, t, "key1", 1) - require.Equal(t, proto.TaskStateFailed, task.State) + require.Equal(t, proto.TaskStateReverted, task.State) + testutil.RegisterTaskMeta(t, c.MockCtrl, testutil.GetStepTwoPlanNotRetryableErrSchedulerExt(c.MockCtrl), c.TestContext, nil) + task = testutil.SubmitAndWaitTask(c.Ctx, t, "key2", 1) + require.Equal(t, proto.TaskStateReverted, task.State) } diff --git a/pkg/disttask/framework/scheduler/scheduler.go b/pkg/disttask/framework/scheduler/scheduler.go index aac05536bd04f..1d252302a7cd2 100644 --- a/pkg/disttask/framework/scheduler/scheduler.go +++ b/pkg/disttask/framework/scheduler/scheduler.go @@ -479,13 +479,7 @@ func (s *BaseScheduler) handlePlanErr(err error) error { } task.Error = err s.task.Store(&task) - - if err = s.OnDone(s.ctx, s, &task); err != nil { - return errors.Trace(err) - } - - // TODO: to reverting state? - return s.taskMgr.FailTask(s.ctx, task.ID, task.State, task.Error) + return s.taskMgr.RevertTask(s.ctx, task.ID, task.State, task.Error) } // MockServerInfo exported for scheduler_test.go diff --git a/pkg/disttask/framework/taskexecutor/manager.go b/pkg/disttask/framework/taskexecutor/manager.go index 4914781291989..b72b0e7fef2a9 100644 --- a/pkg/disttask/framework/taskexecutor/manager.go +++ b/pkg/disttask/framework/taskexecutor/manager.go @@ -17,7 +17,6 @@ package taskexecutor import ( "context" "sync" - "sync/atomic" "time" "github.com/docker/go-units" @@ -306,12 +305,6 @@ func (m *Manager) cancelTaskExecutors(tasks []*proto.TaskBase) { } } -// TestContext only used in tests. -type TestContext struct { - TestSyncSubtaskRun chan struct{} - mockDown atomic.Bool -} - // startTaskExecutor handles a runnable task. func (m *Manager) startTaskExecutor(task *proto.Task) { // runCtx only used in executor.Run, cancel in m.fetchAndFastCancelTasks. diff --git a/pkg/disttask/framework/testutil/scheduler_util.go b/pkg/disttask/framework/testutil/scheduler_util.go index b00f45e992d40..628e039453805 100644 --- a/pkg/disttask/framework/testutil/scheduler_util.go +++ b/pkg/disttask/framework/testutil/scheduler_util.go @@ -125,6 +125,17 @@ func GetPlanNotRetryableErrSchedulerExt(ctrl *gomock.Controller) scheduler.Exten }) } +// GetStepTwoPlanNotRetryableErrSchedulerExt returns mock scheduler.Extension which will generate non retryable error when planning for step two. +func GetStepTwoPlanNotRetryableErrSchedulerExt(ctrl *gomock.Controller) scheduler.Extension { + return GetMockSchedulerExt(ctrl, SchedulerInfo{ + AllErrorRetryable: false, + StepInfos: []StepInfo{ + {Step: proto.StepOne, SubtaskCnt: 10}, + {Step: proto.StepTwo, Err: errors.New("not retryable err"), ErrRepeatCount: math.MaxInt64}, + }, + }) +} + // GetPlanErrSchedulerExt returns mock scheduler.Extension which will generate error when planning. func GetPlanErrSchedulerExt(ctrl *gomock.Controller, testContext *TestContext) scheduler.Extension { mockScheduler := mockDispatch.NewMockExtension(ctrl) From b5a2fad42e8584156b4d2c72b1ea06615305d38c Mon Sep 17 00:00:00 2001 From: ywqzzy <592838129@qq.com> Date: Fri, 23 Feb 2024 15:35:51 +0800 Subject: [PATCH 2/2] fix --- tests/realtikvtest/importintotest4/global_sort_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/realtikvtest/importintotest4/global_sort_test.go b/tests/realtikvtest/importintotest4/global_sort_test.go index 32f4da62c02ae..6427e7a563e0d 100644 --- a/tests/realtikvtest/importintotest4/global_sort_test.go +++ b/tests/realtikvtest/importintotest4/global_sort_test.go @@ -25,6 +25,7 @@ import ( "time" "github.com/fsouza/fake-gcs-server/fakestorage" + "github.com/pingcap/tidb/pkg/disttask/framework/proto" "github.com/pingcap/tidb/pkg/disttask/framework/scheduler" "github.com/pingcap/tidb/pkg/disttask/framework/storage" "github.com/pingcap/tidb/pkg/disttask/importinto" @@ -117,7 +118,7 @@ func (s *mockGCSSuite) TestGlobalSortBasic() { s.Eventually(func() bool { task, err2 = taskManager.GetTaskByKeyWithHistory(ctx, importinto.TaskKey(int64(jobID))) s.NoError(err2) - return task.State == "failed" + return task.State == proto.TaskStateReverted }, 30*time.Second, 300*time.Millisecond) // check all sorted data cleaned up <-scheduler.WaitCleanUpFinished