Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

disttask: Refactor taskExecutor slotManager to handle task occupation #49713

Merged
merged 10 commits into from
Dec 28, 2023
1 change: 1 addition & 0 deletions pkg/disttask/framework/proto/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ var (
)

// Compare compares two tasks by task order.
// returns < 0 represents priority of t is higher than other.
func (t *Task) Compare(other *Task) int {
if t.Priority != other.Priority {
return t.Priority - other.Priority
Expand Down
62 changes: 43 additions & 19 deletions pkg/disttask/framework/taskexecutor/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,15 @@ import (
var (
executorPoolSize int32 = 4
// same as scheduler
checkTime = 300 * time.Millisecond
recoverMetaInterval = 90 * time.Second
retrySQLTimes = 30
retrySQLInterval = 500 * time.Millisecond
checkTime = 300 * time.Millisecond
recoverMetaInterval = 90 * time.Second
retrySQLTimes = 30
retrySQLInterval = 500 * time.Millisecond
unfinishedSubtaskStates = []interface{}{
proto.TaskStatePending, proto.TaskStateRevertPending,
// for the case that the tidb is restarted when the subtask is running.
proto.TaskStateRunning, proto.TaskStateReverting,
}
)

// ManagerBuilder is used to build a Manager.
Expand Down Expand Up @@ -90,8 +95,9 @@ func (b *ManagerBuilder) BuildManager(ctx context.Context, id string, taskTable
logCtx: logutil.WithFields(context.Background()),
newPool: b.newPool,
slotManager: &slotManager{
executorSlotInfos: make(map[int64]*slotInfo),
available: cpu.GetCPUCount(),
taskID2Index: make(map[int64]int),
executorTasks: make([]*proto.Task, 0),
available: cpu.GetCPUCount(),
},
}
m.ctx, m.cancel = context.WithCancel(ctx)
Expand Down Expand Up @@ -204,10 +210,7 @@ func (m *Manager) onRunnableTasks(tasks []*proto.Task) {
tasks = m.filterAlreadyHandlingTasks(tasks)

for _, task := range tasks {
exist, err := m.taskTable.HasSubtasksInStates(m.ctx, m.id, task.ID, task.Step,
proto.TaskStatePending, proto.TaskStateRevertPending,
// for the case that the tidb is restarted when the subtask is running.
proto.TaskStateRunning, proto.TaskStateReverting)
exist, err := m.taskTable.HasSubtasksInStates(m.ctx, m.id, task.ID, task.Step, unfinishedSubtaskStates...)
if err != nil {
logutil.Logger(m.logCtx).Error("check subtask exist failed", zap.Error(err))
m.logErr(err)
Expand All @@ -218,8 +221,15 @@ func (m *Manager) onRunnableTasks(tasks []*proto.Task) {
}
logutil.Logger(m.logCtx).Info("detect new subtask", zap.Int64("task-id", task.ID))

if !m.slotManager.canAlloc(task) {
logutil.Logger(m.logCtx).Warn("subtask has been rejected", zap.Int64("task-id", task.ID))
canAlloc, tasksNeedFree := m.slotManager.canAlloc(task)
if len(tasksNeedFree) > 0 {
m.cancelTaskExecutors(tasksNeedFree)
// do not handle the tasks with lower priority if current task is waiting tasks free.
break
}

if !canAlloc {
logutil.Logger(m.logCtx).Debug("subtask has been rejected", zap.Int64("task-id", task.ID))
okJiang marked this conversation as resolved.
Show resolved Hide resolved
continue
}
m.addHandlingTask(task.ID)
Expand Down Expand Up @@ -313,6 +323,19 @@ func (m *Manager) cancelAllRunningTasks() {
}
}

func (m *Manager) cancelTaskExecutors(tasks []*proto.Task) {
m.mu.RLock()
defer m.mu.RUnlock()
for _, task := range tasks {
logutil.Logger(m.logCtx).Info("cancelTasks", zap.Any("task_id", task.ID))
if cancel, ok := m.mu.handlingTasks[task.ID]; ok && cancel != nil {
// Pause all running subtasks, don't mark subtasks as canceled.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

change the comments

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Pause all running subtasks, don't mark subtasks as canceled.
// Pause given subtasks temporarily, don't mark subtasks as canceled.

like this?

// Should not change the subtask's state.
cancel(nil)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not m.slotManager.free(t.ID) here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rest LGTM

}
}

// filterAlreadyHandlingTasks filters the tasks that are already handled.
func (m *Manager) filterAlreadyHandlingTasks(tasks []*proto.Task) []*proto.Task {
m.mu.RLock()
Expand Down Expand Up @@ -375,7 +398,7 @@ func (m *Manager) onRunnableTask(task *proto.Task) {
}
}()
})
task, err := m.taskTable.GetTaskByID(m.ctx, task.ID)
task, err = m.taskTable.GetTaskByID(m.ctx, task.ID)
if err != nil {
m.logErr(err)
return
Expand All @@ -388,19 +411,20 @@ func (m *Manager) onRunnableTask(task *proto.Task) {
zap.Int64("task-id", task.ID), zap.Int64("step", int64(task.Step)), zap.Stringer("state", task.State))
return
}
if exist, err := m.taskTable.HasSubtasksInStates(
m.ctx,
m.id, task.ID, task.Step,
proto.TaskStatePending, proto.TaskStateRevertPending,
// for the case that the tidb is restarted when the subtask is running.
proto.TaskStateRunning, proto.TaskStateReverting); err != nil {
if exist, err := m.taskTable.HasSubtasksInStates(m.ctx, m.id, task.ID, task.Step,
unfinishedSubtaskStates...); err != nil {
m.logErr(err)
return
} else if !exist {
continue
}
switch task.State {
case proto.TaskStateRunning:
if taskCtx.Err() != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems no need to print this

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prefer to keep it. We can know some subtasks have been canceled

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there's already log at line 386

we also log in cancelTaskExecutors

Copy link
Member Author

@okJiang okJiang Dec 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated log level to debug

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just remove it

logutil.Logger(m.logCtx).Debug("onRunnableTask exit for taskCtx.Done",
zap.Int64("task-id", task.ID), zap.Stringer("type", task.Type), zap.Error(taskCtx.Err()))
okJiang marked this conversation as resolved.
Show resolved Hide resolved
return
}
// use taskCtx for canceling.
err = executor.Run(taskCtx, task)
case proto.TaskStatePausing:
Expand Down
202 changes: 164 additions & 38 deletions pkg/disttask/framework/taskexecutor/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,6 @@ import (
"go.uber.org/mock/gomock"
)

var unfinishedSubtaskStates = []interface{}{
proto.TaskStatePending, proto.TaskStateRevertPending,
proto.TaskStateRunning, proto.TaskStateReverting,
}

func getPoolRunFn() (*sync.WaitGroup, func(f func()) error) {
wg := &sync.WaitGroup{}
return wg, func(f func()) error {
Expand Down Expand Up @@ -78,6 +73,12 @@ func TestManageTask(t *testing.T) {
m.cancelAllRunningTasks()
require.Equal(t, context.Canceled, ctx1.Err())

m.addHandlingTask(2)
ctx1, cancel1 = context.WithCancelCause(context.Background())
m.registerCancelFunc(2, cancel1)
m.cancelTaskExecutors([]*proto.Task{{ID: 2}})
require.Equal(t, context.Canceled, ctx1.Err())

// test cancel.
m.addHandlingTask(1)
ctx2, cancel2 := context.WithCancelCause(context.Background())
Expand Down Expand Up @@ -272,32 +273,87 @@ func TestSlotManagerInManager(t *testing.T) {
require.NoError(t, err)
m.slotManager.available = 10

taskID1 := int64(1)
taskID2 := int64(2)

now := time.Now()
var (
taskID1 = int64(1)
taskID2 = int64(2)

task1 = &proto.Task{
ID: taskID1,
State: proto.TaskStateRunning,
Concurrency: 10,
Step: proto.StepOne,
Type: "type",
}
task2 = &proto.Task{
ID: taskID2,
State: proto.TaskStateRunning,
Concurrency: 1,
Step: proto.StepOne,
Type: "type",
}
)

ch := make(chan error)
defer close(ch)
wg, runFn := getPoolRunFn()

task1 := &proto.Task{
ID: taskID1,
State: proto.TaskStateRunning,
CreateTime: now,
Concurrency: 10,
Step: proto.StepOne,
Type: "type",
}
task2 := &proto.Task{
ID: taskID2,
State: proto.TaskStateRunning,
CreateTime: now,
Concurrency: 1,
Step: proto.StepOne,
Type: "type",
}
// ******** Test task1 alloc success ********
// 1. task1 alloc success
// 2. task2 alloc failed
// 3. task1 run success
mockTaskTable.EXPECT().HasSubtasksInStates(m.ctx, id, taskID1, proto.StepOne,
unfinishedSubtaskStates...).
Return(true, nil)
mockPool.EXPECT().Run(gomock.Any()).DoAndReturn(runFn)
mockTaskTable.EXPECT().HasSubtasksInStates(m.ctx, id, taskID2, proto.StepOne,
unfinishedSubtaskStates...).
Return(true, nil)

ch := make(chan struct{})
// mock inside onRunnableTask
mockInternalExecutor.EXPECT().Init(gomock.Any()).Return(nil)
mockTaskTable.EXPECT().GetTaskByID(m.ctx, taskID1).Return(task1, nil)
mockTaskTable.EXPECT().HasSubtasksInStates(m.ctx, id, taskID1, proto.StepOne,
unfinishedSubtaskStates...).
Return(true, nil)
// task1 start running
mockInternalExecutor.EXPECT().Run(gomock.Any(), task1).DoAndReturn(func(_ context.Context, _ *proto.Task) error {
return <-ch
})

wg, runFn := getPoolRunFn()
m.onRunnableTasks([]*proto.Task{task1, task2})
// task1 alloc resource success
require.Eventually(t, func() bool {
if m.slotManager.available != 0 || len(m.slotManager.executorTasks) != 1 ||
m.slotManager.executorTasks[0].ID != task1.ID {
return false
}
return ctrl.Satisfied()
}, 2*time.Second, 300*time.Millisecond)
ch <- nil

// task1 succeed
task1.State = proto.TaskStateSucceed
mockTaskTable.EXPECT().GetTaskByID(m.ctx, taskID1).Return(task1, nil)
mockInternalExecutor.EXPECT().Close()
wg.Wait()
require.Equal(t, 10, m.slotManager.available)
require.Equal(t, 0, len(m.slotManager.executorTasks))
require.True(t, ctrl.Satisfied())

// ******** Test task occupation ********
task1.State = proto.TaskStateRunning
var (
taskID3 = int64(3)
task3 = &proto.Task{
ID: taskID3,
State: proto.TaskStateRunning,
Concurrency: 1,
Priority: -1,
Step: proto.StepOne,
Type: "type",
}
)
// 1. task1 alloc success
mockTaskTable.EXPECT().HasSubtasksInStates(m.ctx, id, taskID1, proto.StepOne,
unfinishedSubtaskStates...).
Return(true, nil)
Expand All @@ -314,26 +370,96 @@ func TestSlotManagerInManager(t *testing.T) {
Return(true, nil)
// task1 start running
mockInternalExecutor.EXPECT().Run(gomock.Any(), task1).DoAndReturn(func(_ context.Context, _ *proto.Task) error {
<-ch
return nil
return <-ch
})

m.onRunnableTasks([]*proto.Task{task1, task2})
time.Sleep(2 * time.Second)

// task1 alloc resource success
require.Eventually(t, func() bool {
if m.slotManager.available != 0 || len(m.slotManager.executorTasks) != 1 ||
m.slotManager.executorTasks[0].ID != task1.ID {
return false
}
return ctrl.Satisfied()
}, 2*time.Second, 300*time.Millisecond)

// 2. task1 is occupied by task3, task1 start to pausing
okJiang marked this conversation as resolved.
Show resolved Hide resolved
// 3. task3 is waiting for task1 to be released, and task2 can't be allocated
mockTaskTable.EXPECT().HasSubtasksInStates(m.ctx, id, taskID3, proto.StepOne,
unfinishedSubtaskStates...).
Return(true, nil)

// the priority of task3 is higher than task2, so task3 is in front of task2
m.onRunnableTasks([]*proto.Task{task3, task2})
require.Equal(t, 0, m.slotManager.available)
require.Equal(t, map[int64]*slotInfo{
taskID1: {taskID: int(taskID1), slotCount: 10},
}, m.slotManager.executorSlotInfos)
ch <- struct{}{}
require.Equal(t, []*proto.Task{task1}, m.slotManager.executorTasks)
require.True(t, ctrl.Satisfied())

// task1 succeed
task1.State = proto.TaskStateSucceed
mockTaskTable.EXPECT().GetTaskByID(m.ctx, taskID1).Return(task1, nil)
mockTaskTable.EXPECT().HasSubtasksInStates(m.ctx, id, taskID1, proto.StepOne,
unfinishedSubtaskStates...).
Return(true, nil)
mockInternalExecutor.EXPECT().Close()

// 4. task1 is released, task3 alloc success, start to run
ch <- context.Canceled
wg.Wait()
require.Equal(t, 10, m.slotManager.available)
require.Len(t, m.slotManager.executorTasks, 0)
require.True(t, ctrl.Satisfied())

mockTaskTable.EXPECT().HasSubtasksInStates(m.ctx, id, taskID3, proto.StepOne,
unfinishedSubtaskStates...).
Return(true, nil)
mockPool.EXPECT().Run(gomock.Any()).DoAndReturn(runFn)
mockTaskTable.EXPECT().HasSubtasksInStates(m.ctx, id, taskID1, proto.StepOne,
unfinishedSubtaskStates...).
Return(true, nil)
mockTaskTable.EXPECT().HasSubtasksInStates(m.ctx, id, taskID2, proto.StepOne,
unfinishedSubtaskStates...).
Return(true, nil)
mockPool.EXPECT().Run(gomock.Any()).DoAndReturn(runFn)

// mock inside onRunnableTask
mockInternalExecutor.EXPECT().Init(gomock.Any()).Return(nil)
mockTaskTable.EXPECT().GetTaskByID(m.ctx, taskID3).Return(task3, nil)
mockTaskTable.EXPECT().HasSubtasksInStates(m.ctx, id, taskID3, proto.StepOne,
unfinishedSubtaskStates...).
Return(true, nil)
mockInternalExecutor.EXPECT().Run(gomock.Any(), task3).DoAndReturn(func(_ context.Context, _ *proto.Task) error {
return <-ch
})

// 5. available is enough, task2 alloc success,
mockInternalExecutor.EXPECT().Init(gomock.Any()).Return(nil)
mockTaskTable.EXPECT().GetTaskByID(m.ctx, taskID2).Return(task2, nil)
mockTaskTable.EXPECT().HasSubtasksInStates(m.ctx, id, taskID2, proto.StepOne,
unfinishedSubtaskStates...).
Return(true, nil)
mockInternalExecutor.EXPECT().Run(gomock.Any(), task2).DoAndReturn(func(_ context.Context, _ *proto.Task) error {
return <-ch
})

m.onRunnableTasks([]*proto.Task{task3, task1, task2})
time.Sleep(2 * time.Second)
require.Eventually(t, func() bool {
if m.slotManager.available != 8 || len(m.slotManager.executorTasks) != 2 {
return false
}
return ctrl.Satisfied()
}, 2*time.Second, 300*time.Millisecond)

// 6. task3/task2 run success
task3.State = proto.TaskStateSucceed
mockTaskTable.EXPECT().GetTaskByID(m.ctx, taskID3).Return(task3, nil)
mockInternalExecutor.EXPECT().Close()
task2.State = proto.TaskStateSucceed
mockTaskTable.EXPECT().GetTaskByID(m.ctx, taskID2).Return(task2, nil)
mockInternalExecutor.EXPECT().Close()
ch <- nil
ch <- nil
wg.Wait()
require.Equal(t, 10, m.slotManager.available)
require.Equal(t, 0, len(m.slotManager.executorSlotInfos))
require.Equal(t, 0, len(m.slotManager.executorTasks))
require.True(t, ctrl.Satisfied())
}
Loading