From 2003d2a92e51ae16f3b49dc563cd018e4ce2b58f Mon Sep 17 00:00:00 2001 From: okJiang <819421878@qq.com> Date: Wed, 20 Dec 2023 12:52:02 +0800 Subject: [PATCH] fix comment: refactor ut --- .../framework/taskexecutor/manager.go | 18 +-- .../framework/taskexecutor/manager_test.go | 111 ++++++------------ 2 files changed, 35 insertions(+), 94 deletions(-) diff --git a/pkg/disttask/framework/taskexecutor/manager.go b/pkg/disttask/framework/taskexecutor/manager.go index 48fb9ad321a34..13bfa6b14a561 100644 --- a/pkg/disttask/framework/taskexecutor/manager.go +++ b/pkg/disttask/framework/taskexecutor/manager.go @@ -41,10 +41,6 @@ var ( recoverMetaInterval = 90 * time.Second retrySQLTimes = 30 retrySQLInterval = 500 * time.Millisecond - - // for test - onRunnableTasksTick = make(chan struct{}) - onRunnableTaskTick = make(chan struct{}) ) // ManagerBuilder is used to build a Manager. @@ -223,15 +219,13 @@ func (m *Manager) onRunnableTasks(tasks []*proto.Task) { logutil.Logger(m.logCtx).Info("detect new subtask", zap.Int64("task-id", task.ID)) if !m.slotManager.canAlloc(task) { - failpoint.Inject("taskTick", func() { - <-onRunnableTasksTick - }) + logutil.Logger(m.logCtx).Warn("subtask has been rejected", zap.Int64("task-id", task.ID)) continue } m.addHandlingTask(task.ID) + m.slotManager.alloc(task) t := task err = m.executorPool.Run(func() { - m.slotManager.alloc(t) defer m.slotManager.free(t.ID) m.onRunnableTask(t) m.removeHandlingTask(t.ID) @@ -242,10 +236,6 @@ func (m *Manager) onRunnableTasks(tasks []*proto.Task) { m.logErr(err) return } - - failpoint.Inject("taskTick", func() { - <-onRunnableTasksTick - }) } } @@ -422,10 +412,6 @@ func (m *Manager) onRunnableTask(task *proto.Task) { if err != nil { logutil.Logger(m.logCtx).Error("failed to handle task", zap.Error(err)) } - - failpoint.Inject("taskTick", func() { - <-onRunnableTaskTick - }) } } diff --git a/pkg/disttask/framework/taskexecutor/manager_test.go b/pkg/disttask/framework/taskexecutor/manager_test.go index 4e99149ea721a..80e9c1e1c1e09 100644 --- a/pkg/disttask/framework/taskexecutor/manager_test.go +++ b/pkg/disttask/framework/taskexecutor/manager_test.go @@ -21,7 +21,6 @@ import ( "testing" "time" - "github.com/pingcap/failpoint" "github.com/pingcap/tidb/pkg/disttask/framework/mock" "github.com/pingcap/tidb/pkg/disttask/framework/proto" "github.com/pingcap/tidb/pkg/resourcemanager/pool/spool" @@ -295,90 +294,46 @@ func TestSlotManagerInManager(t *testing.T) { Type: "type", } + ch := make(chan struct{}) + wg, runFn := getPoolRunFn() - // task1 is prior to task2 - { - // mock in manager - mockTaskTable.EXPECT().GetTasksInStates(m.ctx, proto.TaskStateRunning, proto.TaskStateReverting). - Return([]*proto.Task{task1, task2}, nil) - mockTaskTable.EXPECT().HasSubtasksInStates(m.ctx, id, taskID1, proto.StepOne, - unfinishedSubtaskStates...). - Return(true, nil) - mockPool.EXPECT().Run(gomock.Any()).DoAndReturn(runFn) - } - { - // mock inside onRunnableTask - mockInternalExecutor.EXPECT().Init(gomock.Any()).Return(nil) - mockTaskTable.EXPECT().GetTaskByID(m.ctx, taskID1).Return(task1, nil) - mockTaskTable.EXPECT().HasSubtasksInStates(m.ctx, id, taskID1, proto.StepOne, - unfinishedSubtaskStates...). - Return(true, nil) - // task1 start running - mockInternalExecutor.EXPECT().Run(gomock.Any(), task1).Return(nil) - } + mockTaskTable.EXPECT().HasSubtasksInStates(m.ctx, id, taskID1, proto.StepOne, + unfinishedSubtaskStates...). + Return(true, nil) + mockPool.EXPECT().Run(gomock.Any()).DoAndReturn(runFn) + mockTaskTable.EXPECT().HasSubtasksInStates(m.ctx, id, taskID2, proto.StepOne, + unfinishedSubtaskStates...). + Return(true, nil) - failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/taskTick", "return()") - defer func() { - failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/taskTick") - }() - go m.fetchAndHandleRunnableTasksLoop() - - { - // mock inside onRunnableTask - time.Sleep(2 * time.Second) - // task2 has been blocked by slot manager - require.Equal(t, 0, m.slotManager.available) - - { - // mock in manager, task2 has been rejected by slot manager - mockTaskTable.EXPECT().HasSubtasksInStates(m.ctx, id, taskID2, proto.StepOne, - unfinishedSubtaskStates...). - Return(true, nil) - onRunnableTasksTick <- struct{}{} - time.Sleep(time.Second) - } - - // task1 succeed - task1.State = proto.TaskStateSucceed - mockTaskTable.EXPECT().GetTaskByID(m.ctx, taskID1).Return(task1, nil) - mockInternalExecutor.EXPECT().Close() - onRunnableTaskTick <- struct{}{} - time.Sleep(time.Second) - require.Equal(t, 10, m.slotManager.available) - } + // mock inside onRunnableTask + mockInternalExecutor.EXPECT().Init(gomock.Any()).Return(nil) + mockTaskTable.EXPECT().GetTaskByID(m.ctx, taskID1).Return(task1, nil) + mockTaskTable.EXPECT().HasSubtasksInStates(m.ctx, id, taskID1, proto.StepOne, + unfinishedSubtaskStates...). + Return(true, nil) + // task1 start running + mockInternalExecutor.EXPECT().Run(gomock.Any(), task1).DoAndReturn(func(_ context.Context, _ *proto.Task) error { + <-ch + return nil + }) - // task2 is after task1 - { - // mock in manager - mockTaskTable.EXPECT().GetTasksInStates(m.ctx, proto.TaskStateRunning, proto.TaskStateReverting). - Return([]*proto.Task{task2}, nil) - mockTaskTable.EXPECT().HasSubtasksInStates(m.ctx, id, taskID2, proto.StepOne, - unfinishedSubtaskStates...). - Return(true, nil) - mockPool.EXPECT().Run(gomock.Any()).DoAndReturn(runFn) - } - { - // mock inside onRunnableTask - mockInternalExecutor.EXPECT().Init(gomock.Any()).Return(nil) - mockTaskTable.EXPECT().GetTaskByID(m.ctx, taskID2).Return(task2, nil) - mockTaskTable.EXPECT().HasSubtasksInStates(m.ctx, id, taskID2, proto.StepOne, - unfinishedSubtaskStates...). - Return(true, nil) - // task2 start running - mockInternalExecutor.EXPECT().Run(gomock.Any(), task2).Return(nil) - } - onRunnableTasksTick <- struct{}{} + m.onRunnableTasks([]*proto.Task{task1, task2}) time.Sleep(2 * time.Second) - require.Equal(t, 9, m.slotManager.available) - // task2 succeed - task2.State = proto.TaskStateSucceed - mockTaskTable.EXPECT().GetTaskByID(m.ctx, taskID2).Return(task2, nil) + // task1 alloc resource success + require.Equal(t, 0, m.slotManager.available) + require.Equal(t, map[int64]*slotInfo{ + taskID1: {taskID: int(taskID1), slotCount: 10}, + }, m.slotManager.executorSlotInfos) + ch <- struct{}{} + + // task1 succeed + task1.State = proto.TaskStateSucceed + mockTaskTable.EXPECT().GetTaskByID(m.ctx, taskID1).Return(task1, nil) mockInternalExecutor.EXPECT().Close() - onRunnableTaskTick <- struct{}{} - time.Sleep(time.Second) - require.Equal(t, 10, m.slotManager.available) wg.Wait() + require.Equal(t, 10, m.slotManager.available) + require.Equal(t, 0, len(m.slotManager.executorSlotInfos)) }