Skip to content

Commit

Permalink
fix comment: refactor ut
Browse files Browse the repository at this point in the history
  • Loading branch information
okJiang committed Dec 20, 2023
1 parent 7d9f45e commit 2003d2a
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 94 deletions.
18 changes: 2 additions & 16 deletions pkg/disttask/framework/taskexecutor/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,6 @@ var (
recoverMetaInterval = 90 * time.Second
retrySQLTimes = 30
retrySQLInterval = 500 * time.Millisecond

// for test
onRunnableTasksTick = make(chan struct{})
onRunnableTaskTick = make(chan struct{})
)

// ManagerBuilder is used to build a Manager.
Expand Down Expand Up @@ -223,15 +219,13 @@ func (m *Manager) onRunnableTasks(tasks []*proto.Task) {
logutil.Logger(m.logCtx).Info("detect new subtask", zap.Int64("task-id", task.ID))

if !m.slotManager.canAlloc(task) {
failpoint.Inject("taskTick", func() {
<-onRunnableTasksTick
})
logutil.Logger(m.logCtx).Warn("subtask has been rejected", zap.Int64("task-id", task.ID))
continue
}
m.addHandlingTask(task.ID)
m.slotManager.alloc(task)
t := task
err = m.executorPool.Run(func() {
m.slotManager.alloc(t)
defer m.slotManager.free(t.ID)
m.onRunnableTask(t)
m.removeHandlingTask(t.ID)
Expand All @@ -242,10 +236,6 @@ func (m *Manager) onRunnableTasks(tasks []*proto.Task) {
m.logErr(err)
return
}

failpoint.Inject("taskTick", func() {
<-onRunnableTasksTick
})
}
}

Expand Down Expand Up @@ -422,10 +412,6 @@ func (m *Manager) onRunnableTask(task *proto.Task) {
if err != nil {
logutil.Logger(m.logCtx).Error("failed to handle task", zap.Error(err))
}

failpoint.Inject("taskTick", func() {
<-onRunnableTaskTick
})
}
}

Expand Down
111 changes: 33 additions & 78 deletions pkg/disttask/framework/taskexecutor/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"testing"
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/disttask/framework/mock"
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
"github.com/pingcap/tidb/pkg/resourcemanager/pool/spool"
Expand Down Expand Up @@ -295,90 +294,46 @@ func TestSlotManagerInManager(t *testing.T) {
Type: "type",
}

ch := make(chan struct{})

wg, runFn := getPoolRunFn()

// task1 is prior to task2
{
// mock in manager
mockTaskTable.EXPECT().GetTasksInStates(m.ctx, proto.TaskStateRunning, proto.TaskStateReverting).
Return([]*proto.Task{task1, task2}, nil)
mockTaskTable.EXPECT().HasSubtasksInStates(m.ctx, id, taskID1, proto.StepOne,
unfinishedSubtaskStates...).
Return(true, nil)
mockPool.EXPECT().Run(gomock.Any()).DoAndReturn(runFn)
}
{
// mock inside onRunnableTask
mockInternalExecutor.EXPECT().Init(gomock.Any()).Return(nil)
mockTaskTable.EXPECT().GetTaskByID(m.ctx, taskID1).Return(task1, nil)
mockTaskTable.EXPECT().HasSubtasksInStates(m.ctx, id, taskID1, proto.StepOne,
unfinishedSubtaskStates...).
Return(true, nil)
// task1 start running
mockInternalExecutor.EXPECT().Run(gomock.Any(), task1).Return(nil)
}
mockTaskTable.EXPECT().HasSubtasksInStates(m.ctx, id, taskID1, proto.StepOne,
unfinishedSubtaskStates...).
Return(true, nil)
mockPool.EXPECT().Run(gomock.Any()).DoAndReturn(runFn)
mockTaskTable.EXPECT().HasSubtasksInStates(m.ctx, id, taskID2, proto.StepOne,
unfinishedSubtaskStates...).
Return(true, nil)

failpoint.Enable("github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/taskTick", "return()")
defer func() {
failpoint.Disable("github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/taskTick")
}()
go m.fetchAndHandleRunnableTasksLoop()

{
// mock inside onRunnableTask
time.Sleep(2 * time.Second)
// task2 has been blocked by slot manager
require.Equal(t, 0, m.slotManager.available)

{
// mock in manager, task2 has been rejected by slot manager
mockTaskTable.EXPECT().HasSubtasksInStates(m.ctx, id, taskID2, proto.StepOne,
unfinishedSubtaskStates...).
Return(true, nil)
onRunnableTasksTick <- struct{}{}
time.Sleep(time.Second)
}

// task1 succeed
task1.State = proto.TaskStateSucceed
mockTaskTable.EXPECT().GetTaskByID(m.ctx, taskID1).Return(task1, nil)
mockInternalExecutor.EXPECT().Close()
onRunnableTaskTick <- struct{}{}
time.Sleep(time.Second)
require.Equal(t, 10, m.slotManager.available)
}
// mock inside onRunnableTask
mockInternalExecutor.EXPECT().Init(gomock.Any()).Return(nil)
mockTaskTable.EXPECT().GetTaskByID(m.ctx, taskID1).Return(task1, nil)
mockTaskTable.EXPECT().HasSubtasksInStates(m.ctx, id, taskID1, proto.StepOne,
unfinishedSubtaskStates...).
Return(true, nil)
// task1 start running
mockInternalExecutor.EXPECT().Run(gomock.Any(), task1).DoAndReturn(func(_ context.Context, _ *proto.Task) error {
<-ch
return nil
})

// task2 is after task1
{
// mock in manager
mockTaskTable.EXPECT().GetTasksInStates(m.ctx, proto.TaskStateRunning, proto.TaskStateReverting).
Return([]*proto.Task{task2}, nil)
mockTaskTable.EXPECT().HasSubtasksInStates(m.ctx, id, taskID2, proto.StepOne,
unfinishedSubtaskStates...).
Return(true, nil)
mockPool.EXPECT().Run(gomock.Any()).DoAndReturn(runFn)
}
{
// mock inside onRunnableTask
mockInternalExecutor.EXPECT().Init(gomock.Any()).Return(nil)
mockTaskTable.EXPECT().GetTaskByID(m.ctx, taskID2).Return(task2, nil)
mockTaskTable.EXPECT().HasSubtasksInStates(m.ctx, id, taskID2, proto.StepOne,
unfinishedSubtaskStates...).
Return(true, nil)
// task2 start running
mockInternalExecutor.EXPECT().Run(gomock.Any(), task2).Return(nil)
}
onRunnableTasksTick <- struct{}{}
m.onRunnableTasks([]*proto.Task{task1, task2})
time.Sleep(2 * time.Second)

require.Equal(t, 9, m.slotManager.available)
// task2 succeed
task2.State = proto.TaskStateSucceed
mockTaskTable.EXPECT().GetTaskByID(m.ctx, taskID2).Return(task2, nil)
// task1 alloc resource success
require.Equal(t, 0, m.slotManager.available)
require.Equal(t, map[int64]*slotInfo{
taskID1: {taskID: int(taskID1), slotCount: 10},
}, m.slotManager.executorSlotInfos)
ch <- struct{}{}

// task1 succeed
task1.State = proto.TaskStateSucceed
mockTaskTable.EXPECT().GetTaskByID(m.ctx, taskID1).Return(task1, nil)
mockInternalExecutor.EXPECT().Close()
onRunnableTaskTick <- struct{}{}
time.Sleep(time.Second)

require.Equal(t, 10, m.slotManager.available)
wg.Wait()
require.Equal(t, 10, m.slotManager.available)
require.Equal(t, 0, len(m.slotManager.executorSlotInfos))
}

0 comments on commit 2003d2a

Please sign in to comment.