From 8928d78e85e60b9b4a03e9aafb95eee4358bcc8f Mon Sep 17 00:00:00 2001
From: okJiang <819421878@qq.com>
Date: Fri, 22 Dec 2023 14:32:40 +0800
Subject: [PATCH 1/9] Refactor slotManager to handle task occupation

---
 pkg/disttask/framework/proto/task.go          |  1 +
 .../framework/taskexecutor/manager.go         |  6 +-
 .../framework/taskexecutor/manager_test.go    |  4 +-
 pkg/disttask/framework/taskexecutor/slot.go   | 66 +++++++++----
 .../framework/taskexecutor/slot_test.go       | 93 ++++++++++++++++---
 5 files changed, 137 insertions(+), 33 deletions(-)

diff --git a/pkg/disttask/framework/proto/task.go b/pkg/disttask/framework/proto/task.go
index 46c930005b69a..9b180ee368b9e 100644
--- a/pkg/disttask/framework/proto/task.go
+++ b/pkg/disttask/framework/proto/task.go
@@ -165,6 +165,7 @@ var (
 )
 
 // Compare compares two tasks by task order.
+// returns < 0 represents priority of t is higher than other.
 func (t *Task) Compare(other *Task) int {
 	if t.Priority != other.Priority {
 		return t.Priority - other.Priority
diff --git a/pkg/disttask/framework/taskexecutor/manager.go b/pkg/disttask/framework/taskexecutor/manager.go
index 30042ea1ec772..8689ee21623f5 100644
--- a/pkg/disttask/framework/taskexecutor/manager.go
+++ b/pkg/disttask/framework/taskexecutor/manager.go
@@ -90,7 +90,7 @@ func (b *ManagerBuilder) BuildManager(ctx context.Context, id string, taskTable
 		logCtx:    logutil.WithFields(context.Background()),
 		newPool:   b.newPool,
 		slotManager: &slotManager{
-			executorSlotInfos: make(map[int64]*slotInfo),
+			executorSlotInfos: make(map[int64]*proto.Task),
 			available:         cpu.GetCPUCount(),
 		},
 	}
@@ -218,7 +218,9 @@ func (m *Manager) onRunnableTasks(tasks []*proto.Task) {
 		}
 		logutil.Logger(m.logCtx).Info("detect new subtask", zap.Int64("task-id", task.ID))
 
-		if !m.slotManager.canAlloc(task) {
+		canAlloc, tasksNeedFree := m.slotManager.canAlloc(task)
+		m.onCanceledTasks(context.Background(), tasksNeedFree)
+		if !canAlloc {
 			logutil.Logger(m.logCtx).Warn("subtask has been rejected", zap.Int64("task-id", task.ID))
 			continue
 		}
diff --git a/pkg/disttask/framework/taskexecutor/manager_test.go b/pkg/disttask/framework/taskexecutor/manager_test.go
index 80e9c1e1c1e09..7427fa37fd3a7 100644
--- a/pkg/disttask/framework/taskexecutor/manager_test.go
+++ b/pkg/disttask/framework/taskexecutor/manager_test.go
@@ -323,8 +323,8 @@ func TestSlotManagerInManager(t *testing.T) {
 
 	// task1 alloc resource success
 	require.Equal(t, 0, m.slotManager.available)
-	require.Equal(t, map[int64]*slotInfo{
-		taskID1: {taskID: int(taskID1), slotCount: 10},
+	require.Equal(t, map[int64]*proto.Task{
+		taskID1: task1,
 	}, m.slotManager.executorSlotInfos)
 	ch <- struct{}{}
 
diff --git a/pkg/disttask/framework/taskexecutor/slot.go b/pkg/disttask/framework/taskexecutor/slot.go
index e788312fbccc3..c586fa23d26ca 100644
--- a/pkg/disttask/framework/taskexecutor/slot.go
+++ b/pkg/disttask/framework/taskexecutor/slot.go
@@ -15,6 +15,7 @@
 package taskexecutor
 
 import (
+	"slices"
 	"sync"
 
 	"github.com/pingcap/tidb/pkg/disttask/framework/proto"
@@ -23,29 +24,22 @@ import (
 // slotManager is used to manage the slots of the executor.
 type slotManager struct {
 	sync.RWMutex
-	// taskID -> slotInfo
-	executorSlotInfos map[int64]*slotInfo
+	executorSlotInfos map[int64]*proto.Task
 
 	// The number of slots that can be used by the executor.
 	// It is always equal to CPU cores of the instance.
 	available int
-}
 
-type slotInfo struct {
-	taskID int
-	// priority will be used in future
-	priority  int
-	slotCount int
+	taskWaitAlloc *proto.Task
 }
 
 func (sm *slotManager) alloc(task *proto.Task) {
 	sm.Lock()
 	defer sm.Unlock()
-	sm.executorSlotInfos[task.ID] = &slotInfo{
-		taskID:    int(task.ID),
-		priority:  task.Priority,
-		slotCount: task.Concurrency,
+	if sm.taskWaitAlloc != nil && sm.taskWaitAlloc.Compare(task) == 0 {
+		sm.taskWaitAlloc = nil
 	}
+	sm.executorSlotInfos[task.ID] = task
 
 	sm.available -= task.Concurrency
 }
@@ -57,14 +51,54 @@ func (sm *slotManager) free(taskID int64) {
 	slotInfo, ok := sm.executorSlotInfos[taskID]
 	if ok {
 		delete(sm.executorSlotInfos, taskID)
-		sm.available += slotInfo.slotCount
+		sm.available += slotInfo.Concurrency
 	}
 }
 
-// canReserve is used to check whether the instance has enough slots to run the task.
-func (sm *slotManager) canAlloc(task *proto.Task) bool {
+// canAlloc is used to check whether the instance has enough slots to run the task.
+func (sm *slotManager) canAlloc(task *proto.Task) (canAlloc bool, tasksNeedFree []*proto.Task) {
 	sm.RLock()
 	defer sm.RUnlock()
 
-	return sm.available >= task.Concurrency
+	// If a task is waiting for allocation, we can't allocate the lower priority task.
+	if sm.taskWaitAlloc != nil && sm.taskWaitAlloc.Compare(task) < 0 {
+		return false, nil
+	}
+
+	if sm.available >= task.Concurrency {
+		// If the upcoming task's priority is higher than the task waiting for allocation,
+		// we need free the task waiting for allocation.
+		if sm.taskWaitAlloc != nil && sm.taskWaitAlloc.Compare(task) > 0 {
+			sm.taskWaitAlloc = nil
+		}
+		return true, nil
+	}
+
+	// If the task is waiting for allocation, we do not need to free any task again.
+	if sm.taskWaitAlloc != nil && sm.taskWaitAlloc.Compare(task) == 0 {
+		return false, nil
+	}
+
+	allSlotInfos := make([]*proto.Task, 0, len(sm.executorSlotInfos))
+	for _, slotInfo := range sm.executorSlotInfos {
+		allSlotInfos = append(allSlotInfos, slotInfo)
+	}
+	slices.SortFunc(allSlotInfos, func(a, b *proto.Task) int {
+		return a.Compare(b)
+	})
+
+	usedSlots := 0
+	for _, slotInfo := range allSlotInfos {
+		if slotInfo.Compare(task) < 0 {
+			continue
+		}
+		tasksNeedFree = append(tasksNeedFree, slotInfo)
+		usedSlots += slotInfo.Concurrency
+		if sm.available+usedSlots >= task.Concurrency {
+			sm.taskWaitAlloc = task
+			return false, tasksNeedFree
+		}
+	}
+
+	return false, nil
 }
diff --git a/pkg/disttask/framework/taskexecutor/slot_test.go b/pkg/disttask/framework/taskexecutor/slot_test.go
index 885c47745ac65..d3feaf7eab61b 100644
--- a/pkg/disttask/framework/taskexecutor/slot_test.go
+++ b/pkg/disttask/framework/taskexecutor/slot_test.go
@@ -23,32 +23,99 @@ import (
 
 func TestSlotManager(t *testing.T) {
 	sm := slotManager{
-		executorSlotInfos: make(map[int64]*slotInfo),
+		executorSlotInfos: make(map[int64]*proto.Task),
 		available:         10,
 	}
 
 	var (
 		taskID  = int64(1)
 		taskID2 = int64(2)
+		taskID3 = int64(3)
+		taskID4 = int64(4)
+		task    = &proto.Task{
+			ID:          taskID,
+			Priority:    1,
+			Concurrency: 1,
+		}
+		task2 = &proto.Task{
+			ID:          taskID2,
+			Priority:    2,
+			Concurrency: 10,
+		}
 	)
 
-	task := &proto.Task{
-		ID:          taskID,
-		Priority:    1,
-		Concurrency: 1,
-	}
-	require.True(t, sm.canAlloc(task))
+	canAlloc, tasksNeedFree := sm.canAlloc(task)
+	require.True(t, canAlloc)
+	require.Nil(t, tasksNeedFree)
 	sm.alloc(task)
-	require.Equal(t, 1, sm.executorSlotInfos[taskID].priority)
-	require.Equal(t, 1, sm.executorSlotInfos[taskID].slotCount)
+	require.Len(t, sm.executorSlotInfos, 1)
+	require.Equal(t, task, sm.executorSlotInfos[taskID])
 	require.Equal(t, 9, sm.available)
 
-	require.False(t, sm.canAlloc(&proto.Task{
-		ID:          taskID2,
-		Priority:    2,
+	// the available slots is not enough for task2
+	canAlloc, tasksNeedFree = sm.canAlloc(task2)
+	require.False(t, canAlloc)
+	require.Nil(t, tasksNeedFree)
+
+	// increase the priority of task2, task2 is waiting for allocation
+	task2.Priority = 0
+	canAlloc, tasksNeedFree = sm.canAlloc(task2)
+	require.False(t, canAlloc)
+	require.Equal(t, []*proto.Task{task}, tasksNeedFree)
+	require.Equal(t, task2, sm.taskWaitAlloc)
+
+	// task with lower priority is restricted
+	task3 := &proto.Task{
+		ID:          taskID3,
+		Priority:    3,
+		Concurrency: 1,
+	}
+	canAlloc, tasksNeedFree = sm.canAlloc(task3)
+	require.False(t, canAlloc)
+	require.Nil(t, tasksNeedFree)
+
+	// increase the priority of task3, it can be allocated now
+	task3.Priority = -1
+	canAlloc, tasksNeedFree = sm.canAlloc(task3)
+	require.True(t, canAlloc)
+	require.Nil(t, tasksNeedFree)
+	// task2 is occupied by task3
+	require.Nil(t, sm.taskWaitAlloc)
+	sm.alloc(task3)
+	require.Len(t, sm.executorSlotInfos, 2)
+	require.Equal(t, task3, sm.executorSlotInfos[taskID3])
+	require.Equal(t, 8, sm.available)
+	sm.free(taskID3)
+	require.Len(t, sm.executorSlotInfos, 1)
+	require.Nil(t, sm.executorSlotInfos[taskID3])
+
+	// task2 is waiting for allocation again
+	canAlloc, tasksNeedFree = sm.canAlloc(task2)
+	require.False(t, canAlloc)
+	require.Equal(t, []*proto.Task{task}, tasksNeedFree)
+	require.Equal(t, task2, sm.taskWaitAlloc)
+
+	// task2 is occupied by task4
+	task4 := &proto.Task{
+		ID:          taskID4,
+		Priority:    -1,
 		Concurrency: 10,
-	}))
+	}
+	canAlloc, tasksNeedFree = sm.canAlloc(task4)
+	require.False(t, canAlloc)
+	require.Equal(t, []*proto.Task{task}, tasksNeedFree)
+	// task 4 is waiting for allocation
+	require.Equal(t, task4, sm.taskWaitAlloc)
 
 	sm.free(taskID)
+	require.Len(t, sm.executorSlotInfos, 0)
 	require.Nil(t, sm.executorSlotInfos[taskID])
+
+	sm.alloc(task4)
+	require.Len(t, sm.executorSlotInfos, 1)
+	require.Equal(t, task4, sm.executorSlotInfos[taskID4])
+	require.Equal(t, 0, sm.available)
+	sm.free(taskID4)
+	require.Len(t, sm.executorSlotInfos, 0)
+	require.Nil(t, sm.executorSlotInfos[taskID4])
 }

From f703d67522fb0c5376710cddec7426baa34f7c5f Mon Sep 17 00:00:00 2001
From: okJiang <819421878@qq.com>
Date: Mon, 25 Dec 2023 17:59:30 +0800
Subject: [PATCH 2/9] fix comment: Refactor slotManager to improve task
 allocation efficiency

---
 .../framework/taskexecutor/manager.go         | 10 +++-
 pkg/disttask/framework/taskexecutor/slot.go   | 60 +++++++------------
 .../framework/taskexecutor/slot_test.go       | 48 +++++----------
 3 files changed, 45 insertions(+), 73 deletions(-)

diff --git a/pkg/disttask/framework/taskexecutor/manager.go b/pkg/disttask/framework/taskexecutor/manager.go
index 8689ee21623f5..b8a3da4c9c127 100644
--- a/pkg/disttask/framework/taskexecutor/manager.go
+++ b/pkg/disttask/framework/taskexecutor/manager.go
@@ -90,7 +90,8 @@ func (b *ManagerBuilder) BuildManager(ctx context.Context, id string, taskTable
 		logCtx:    logutil.WithFields(context.Background()),
 		newPool:   b.newPool,
 		slotManager: &slotManager{
-			executorSlotInfos: make(map[int64]*proto.Task),
+			taskID2SlotIndex:  make(map[int64]int),
+			executorSlotInfos: make([]*proto.Task, 0),
 			available:         cpu.GetCPUCount(),
 		},
 	}
@@ -219,7 +220,12 @@ func (m *Manager) onRunnableTasks(tasks []*proto.Task) {
 		logutil.Logger(m.logCtx).Info("detect new subtask", zap.Int64("task-id", task.ID))
 
 		canAlloc, tasksNeedFree := m.slotManager.canAlloc(task)
-		m.onCanceledTasks(context.Background(), tasksNeedFree)
+		if tasksNeedFree != nil {
+			m.onCanceledTasks(context.Background(), tasksNeedFree)
+			// do not handle the tasks with lower priority if current task is waiting tasks free.
+			break
+		}
+
 		if !canAlloc {
 			logutil.Logger(m.logCtx).Warn("subtask has been rejected", zap.Int64("task-id", task.ID))
 			continue
diff --git a/pkg/disttask/framework/taskexecutor/slot.go b/pkg/disttask/framework/taskexecutor/slot.go
index c586fa23d26ca..9e70adf802438 100644
--- a/pkg/disttask/framework/taskexecutor/slot.go
+++ b/pkg/disttask/framework/taskexecutor/slot.go
@@ -24,23 +24,27 @@ import (
 // slotManager is used to manage the slots of the executor.
 type slotManager struct {
 	sync.RWMutex
-	executorSlotInfos map[int64]*proto.Task
+	// slotIndex is the index of the task
+	taskID2SlotIndex map[int64]int
+	// executorSlotInfos is used to record the task that is running on the executor.
+	executorSlotInfos []*proto.Task
 
 	// The number of slots that can be used by the executor.
 	// It is always equal to CPU cores of the instance.
 	available int
-
-	taskWaitAlloc *proto.Task
 }
 
 func (sm *slotManager) alloc(task *proto.Task) {
 	sm.Lock()
 	defer sm.Unlock()
-	if sm.taskWaitAlloc != nil && sm.taskWaitAlloc.Compare(task) == 0 {
-		sm.taskWaitAlloc = nil
-	}
-	sm.executorSlotInfos[task.ID] = task
 
+	sm.executorSlotInfos = append(sm.executorSlotInfos, task)
+	slices.SortFunc(sm.executorSlotInfos, func(a, b *proto.Task) int {
+		return b.Compare(a)
+	})
+	for index, slotInfo := range sm.executorSlotInfos {
+		sm.taskID2SlotIndex[slotInfo.ID] = index
+	}
 	sm.available -= task.Concurrency
 }
 
@@ -48,10 +52,16 @@ func (sm *slotManager) free(taskID int64) {
 	sm.Lock()
 	defer sm.Unlock()
 
-	slotInfo, ok := sm.executorSlotInfos[taskID]
-	if ok {
-		delete(sm.executorSlotInfos, taskID)
-		sm.available += slotInfo.Concurrency
+	index, ok := sm.taskID2SlotIndex[taskID]
+	if !ok {
+		return
+	}
+	sm.available += sm.executorSlotInfos[index].Concurrency
+	sm.executorSlotInfos = append(sm.executorSlotInfos[:index], sm.executorSlotInfos[index+1:]...)
+
+	delete(sm.taskID2SlotIndex, taskID)
+	for index, slotInfo := range sm.executorSlotInfos {
+		sm.taskID2SlotIndex[slotInfo.ID] = index
 	}
 }
 
@@ -60,43 +70,19 @@ func (sm *slotManager) canAlloc(task *proto.Task) (canAlloc bool, tasksNeedFree
 	sm.RLock()
 	defer sm.RUnlock()
 
-	// If a task is waiting for allocation, we can't allocate the lower priority task.
-	if sm.taskWaitAlloc != nil && sm.taskWaitAlloc.Compare(task) < 0 {
-		return false, nil
-	}
-
 	if sm.available >= task.Concurrency {
-		// If the upcoming task's priority is higher than the task waiting for allocation,
-		// we need free the task waiting for allocation.
-		if sm.taskWaitAlloc != nil && sm.taskWaitAlloc.Compare(task) > 0 {
-			sm.taskWaitAlloc = nil
-		}
 		return true, nil
 	}
 
-	// If the task is waiting for allocation, we do not need to free any task again.
-	if sm.taskWaitAlloc != nil && sm.taskWaitAlloc.Compare(task) == 0 {
-		return false, nil
-	}
-
-	allSlotInfos := make([]*proto.Task, 0, len(sm.executorSlotInfos))
-	for _, slotInfo := range sm.executorSlotInfos {
-		allSlotInfos = append(allSlotInfos, slotInfo)
-	}
-	slices.SortFunc(allSlotInfos, func(a, b *proto.Task) int {
-		return a.Compare(b)
-	})
-
 	usedSlots := 0
-	for _, slotInfo := range allSlotInfos {
+	for _, slotInfo := range sm.executorSlotInfos {
 		if slotInfo.Compare(task) < 0 {
 			continue
 		}
 		tasksNeedFree = append(tasksNeedFree, slotInfo)
 		usedSlots += slotInfo.Concurrency
 		if sm.available+usedSlots >= task.Concurrency {
-			sm.taskWaitAlloc = task
-			return false, tasksNeedFree
+			return true, tasksNeedFree
 		}
 	}
 
diff --git a/pkg/disttask/framework/taskexecutor/slot_test.go b/pkg/disttask/framework/taskexecutor/slot_test.go
index d3feaf7eab61b..237331970011a 100644
--- a/pkg/disttask/framework/taskexecutor/slot_test.go
+++ b/pkg/disttask/framework/taskexecutor/slot_test.go
@@ -23,7 +23,8 @@ import (
 
 func TestSlotManager(t *testing.T) {
 	sm := slotManager{
-		executorSlotInfos: make(map[int64]*proto.Task),
+		taskID2SlotIndex:  make(map[int64]int),
+		executorSlotInfos: make([]*proto.Task, 0),
 		available:         10,
 	}
 
@@ -31,7 +32,6 @@ func TestSlotManager(t *testing.T) {
 		taskID  = int64(1)
 		taskID2 = int64(2)
 		taskID3 = int64(3)
-		taskID4 = int64(4)
 		task    = &proto.Task{
 			ID:          taskID,
 			Priority:    1,
@@ -49,7 +49,7 @@ func TestSlotManager(t *testing.T) {
 	require.Nil(t, tasksNeedFree)
 	sm.alloc(task)
 	require.Len(t, sm.executorSlotInfos, 1)
-	require.Equal(t, task, sm.executorSlotInfos[taskID])
+	require.Equal(t, task, sm.executorSlotInfos[sm.taskID2SlotIndex[taskID]])
 	require.Equal(t, 9, sm.available)
 
 	// the available slots is not enough for task2
@@ -60,62 +60,42 @@ func TestSlotManager(t *testing.T) {
 	// increase the priority of task2, task2 is waiting for allocation
 	task2.Priority = 0
 	canAlloc, tasksNeedFree = sm.canAlloc(task2)
-	require.False(t, canAlloc)
+	require.True(t, canAlloc)
 	require.Equal(t, []*proto.Task{task}, tasksNeedFree)
-	require.Equal(t, task2, sm.taskWaitAlloc)
 
-	// task with lower priority is restricted
+	// task with higher priority
 	task3 := &proto.Task{
 		ID:          taskID3,
-		Priority:    3,
+		Priority:    -1,
 		Concurrency: 1,
 	}
-	canAlloc, tasksNeedFree = sm.canAlloc(task3)
-	require.False(t, canAlloc)
-	require.Nil(t, tasksNeedFree)
 
-	// increase the priority of task3, it can be allocated now
-	task3.Priority = -1
 	canAlloc, tasksNeedFree = sm.canAlloc(task3)
 	require.True(t, canAlloc)
 	require.Nil(t, tasksNeedFree)
 	// task2 is occupied by task3
-	require.Nil(t, sm.taskWaitAlloc)
 	sm.alloc(task3)
 	require.Len(t, sm.executorSlotInfos, 2)
-	require.Equal(t, task3, sm.executorSlotInfos[taskID3])
+	require.Equal(t, task3, sm.executorSlotInfos[sm.taskID2SlotIndex[taskID3]])
 	require.Equal(t, 8, sm.available)
 	sm.free(taskID3)
 	require.Len(t, sm.executorSlotInfos, 1)
-	require.Nil(t, sm.executorSlotInfos[taskID3])
+	require.Equal(t, task, sm.executorSlotInfos[sm.taskID2SlotIndex[taskID]])
 
 	// task2 is waiting for allocation again
 	canAlloc, tasksNeedFree = sm.canAlloc(task2)
-	require.False(t, canAlloc)
-	require.Equal(t, []*proto.Task{task}, tasksNeedFree)
-	require.Equal(t, task2, sm.taskWaitAlloc)
-
-	// task2 is occupied by task4
-	task4 := &proto.Task{
-		ID:          taskID4,
-		Priority:    -1,
-		Concurrency: 10,
-	}
-	canAlloc, tasksNeedFree = sm.canAlloc(task4)
-	require.False(t, canAlloc)
+	require.True(t, canAlloc)
 	require.Equal(t, []*proto.Task{task}, tasksNeedFree)
-	// task 4 is waiting for allocation
-	require.Equal(t, task4, sm.taskWaitAlloc)
 
 	sm.free(taskID)
 	require.Len(t, sm.executorSlotInfos, 0)
-	require.Nil(t, sm.executorSlotInfos[taskID])
+	require.Len(t, sm.taskID2SlotIndex, 0)
 
-	sm.alloc(task4)
+	sm.alloc(task2)
 	require.Len(t, sm.executorSlotInfos, 1)
-	require.Equal(t, task4, sm.executorSlotInfos[taskID4])
+	require.Equal(t, task2, sm.executorSlotInfos[sm.taskID2SlotIndex[taskID2]])
 	require.Equal(t, 0, sm.available)
-	sm.free(taskID4)
+	sm.free(taskID2)
 	require.Len(t, sm.executorSlotInfos, 0)
-	require.Nil(t, sm.executorSlotInfos[taskID4])
+	require.Len(t, sm.taskID2SlotIndex, 0)
 }

From 202ea217f1a8ce8e6ac760a14b36c0ef7a5d4b3d Mon Sep 17 00:00:00 2001
From: okJiang <819421878@qq.com>
Date: Tue, 26 Dec 2023 16:47:24 +0800
Subject: [PATCH 3/9] fix comment: cancel occupied subtask normally and add ut

---
 .../framework/taskexecutor/manager.go         |  48 +++--
 .../framework/taskexecutor/manager_test.go    | 188 ++++++++++++++----
 2 files changed, 185 insertions(+), 51 deletions(-)

diff --git a/pkg/disttask/framework/taskexecutor/manager.go b/pkg/disttask/framework/taskexecutor/manager.go
index b8a3da4c9c127..a4cc1778a9f25 100644
--- a/pkg/disttask/framework/taskexecutor/manager.go
+++ b/pkg/disttask/framework/taskexecutor/manager.go
@@ -37,10 +37,15 @@ import (
 var (
 	executorPoolSize int32 = 4
 	// same as scheduler
-	checkTime           = 300 * time.Millisecond
-	recoverMetaInterval = 90 * time.Second
-	retrySQLTimes       = 30
-	retrySQLInterval    = 500 * time.Millisecond
+	checkTime               = 300 * time.Millisecond
+	recoverMetaInterval     = 90 * time.Second
+	retrySQLTimes           = 30
+	retrySQLInterval        = 500 * time.Millisecond
+	unfinishedSubtaskStates = []interface{}{
+		proto.TaskStatePending, proto.TaskStateRevertPending,
+		// for the case that the tidb is restarted when the subtask is running.
+		proto.TaskStateRunning, proto.TaskStateReverting,
+	}
 )
 
 // ManagerBuilder is used to build a Manager.
@@ -205,10 +210,7 @@ func (m *Manager) onRunnableTasks(tasks []*proto.Task) {
 	tasks = m.filterAlreadyHandlingTasks(tasks)
 
 	for _, task := range tasks {
-		exist, err := m.taskTable.HasSubtasksInStates(m.ctx, m.id, task.ID, task.Step,
-			proto.TaskStatePending, proto.TaskStateRevertPending,
-			// for the case that the tidb is restarted when the subtask is running.
-			proto.TaskStateRunning, proto.TaskStateReverting)
+		exist, err := m.taskTable.HasSubtasksInStates(m.ctx, m.id, task.ID, task.Step, unfinishedSubtaskStates...)
 		if err != nil {
 			logutil.Logger(m.logCtx).Error("check subtask exist failed", zap.Error(err))
 			m.logErr(err)
@@ -221,7 +223,7 @@ func (m *Manager) onRunnableTasks(tasks []*proto.Task) {
 
 		canAlloc, tasksNeedFree := m.slotManager.canAlloc(task)
 		if tasksNeedFree != nil {
-			m.onCanceledTasks(context.Background(), tasksNeedFree)
+			m.cancelTasks(tasksNeedFree)
 			// do not handle the tasks with lower priority if current task is waiting tasks free.
 			break
 		}
@@ -321,6 +323,19 @@ func (m *Manager) cancelAllRunningTasks() {
 	}
 }
 
+func (m *Manager) cancelTasks(tasks []*proto.Task) {
+	m.mu.RLock()
+	defer m.mu.RUnlock()
+	for _, task := range tasks {
+		logutil.Logger(m.logCtx).Info("cancelTasks", zap.Any("task_id", task.ID))
+		if cancel, ok := m.mu.handlingTasks[task.ID]; ok && cancel != nil {
+			// Pause all running subtasks, don't mark subtasks as canceled.
+			// Should not change the subtask's state.
+			cancel(nil)
+		}
+	}
+}
+
 // filterAlreadyHandlingTasks filters the tasks that are already handled.
 func (m *Manager) filterAlreadyHandlingTasks(tasks []*proto.Task) []*proto.Task {
 	m.mu.RLock()
@@ -383,7 +398,7 @@ func (m *Manager) onRunnableTask(task *proto.Task) {
 				}
 			}()
 		})
-		task, err := m.taskTable.GetTaskByID(m.ctx, task.ID)
+		task, err = m.taskTable.GetTaskByID(m.ctx, task.ID)
 		if err != nil {
 			m.logErr(err)
 			return
@@ -396,12 +411,8 @@ func (m *Manager) onRunnableTask(task *proto.Task) {
 				zap.Int64("task-id", task.ID), zap.Int64("step", int64(task.Step)), zap.Stringer("state", task.State))
 			return
 		}
-		if exist, err := m.taskTable.HasSubtasksInStates(
-			m.ctx,
-			m.id, task.ID, task.Step,
-			proto.TaskStatePending, proto.TaskStateRevertPending,
-			// for the case that the tidb is restarted when the subtask is running.
-			proto.TaskStateRunning, proto.TaskStateReverting); err != nil {
+		if exist, err := m.taskTable.HasSubtasksInStates(m.ctx, m.id, task.ID, task.Step,
+			unfinishedSubtaskStates...); err != nil {
 			m.logErr(err)
 			return
 		} else if !exist {
@@ -409,6 +420,11 @@ func (m *Manager) onRunnableTask(task *proto.Task) {
 		}
 		switch task.State {
 		case proto.TaskStateRunning:
+			if taskCtx.Err() != nil {
+				logutil.Logger(m.logCtx).Info("onRunnableTask exit for taskCtx.Done",
+					zap.Int64("task-id", task.ID), zap.Stringer("type", task.Type), zap.Error(taskCtx.Err()))
+				return
+			}
 			// use taskCtx for canceling.
 			err = executor.Run(taskCtx, task)
 		case proto.TaskStatePausing:
diff --git a/pkg/disttask/framework/taskexecutor/manager_test.go b/pkg/disttask/framework/taskexecutor/manager_test.go
index 7427fa37fd3a7..0d03b49e001c4 100644
--- a/pkg/disttask/framework/taskexecutor/manager_test.go
+++ b/pkg/disttask/framework/taskexecutor/manager_test.go
@@ -29,11 +29,6 @@ import (
 	"go.uber.org/mock/gomock"
 )
 
-var unfinishedSubtaskStates = []interface{}{
-	proto.TaskStatePending, proto.TaskStateRevertPending,
-	proto.TaskStateRunning, proto.TaskStateReverting,
-}
-
 func getPoolRunFn() (*sync.WaitGroup, func(f func()) error) {
 	wg := &sync.WaitGroup{}
 	return wg, func(f func()) error {
@@ -78,6 +73,12 @@ func TestManageTask(t *testing.T) {
 	m.cancelAllRunningTasks()
 	require.Equal(t, context.Canceled, ctx1.Err())
 
+	m.addHandlingTask(2)
+	ctx1, cancel1 = context.WithCancelCause(context.Background())
+	m.registerCancelFunc(2, cancel1)
+	m.cancelTasks([]*proto.Task{{ID: 2}})
+	require.Equal(t, context.Canceled, ctx1.Err())
+
 	// test cancel.
 	m.addHandlingTask(1)
 	ctx2, cancel2 := context.WithCancelCause(context.Background())
@@ -272,32 +273,34 @@ func TestSlotManagerInManager(t *testing.T) {
 	require.NoError(t, err)
 	m.slotManager.available = 10
 
-	taskID1 := int64(1)
-	taskID2 := int64(2)
-
-	now := time.Now()
-
-	task1 := &proto.Task{
-		ID:          taskID1,
-		State:       proto.TaskStateRunning,
-		CreateTime:  now,
-		Concurrency: 10,
-		Step:        proto.StepOne,
-		Type:        "type",
-	}
-	task2 := &proto.Task{
-		ID:          taskID2,
-		State:       proto.TaskStateRunning,
-		CreateTime:  now,
-		Concurrency: 1,
-		Step:        proto.StepOne,
-		Type:        "type",
-	}
-
-	ch := make(chan struct{})
-
+	var (
+		taskID1 = int64(1)
+		taskID2 = int64(2)
+
+		task1 = &proto.Task{
+			ID:          taskID1,
+			State:       proto.TaskStateRunning,
+			Concurrency: 10,
+			Step:        proto.StepOne,
+			Type:        "type",
+		}
+		task2 = &proto.Task{
+			ID:          taskID2,
+			State:       proto.TaskStateRunning,
+			Concurrency: 1,
+			Step:        proto.StepOne,
+			Type:        "type",
+		}
+	)
+
+	ch := make(chan error)
+	defer close(ch)
 	wg, runFn := getPoolRunFn()
 
+	// ******** Test task1 alloc success ********
+	// 1. task1 alloc success
+	// 2. task2 alloc failed
+	// 3. task1 run success
 	mockTaskTable.EXPECT().HasSubtasksInStates(m.ctx, id, taskID1, proto.StepOne,
 		unfinishedSubtaskStates...).
 		Return(true, nil)
@@ -314,8 +317,7 @@ func TestSlotManagerInManager(t *testing.T) {
 		Return(true, nil)
 	// task1 start running
 	mockInternalExecutor.EXPECT().Run(gomock.Any(), task1).DoAndReturn(func(_ context.Context, _ *proto.Task) error {
-		<-ch
-		return nil
+		return <-ch
 	})
 
 	m.onRunnableTasks([]*proto.Task{task1, task2})
@@ -323,17 +325,133 @@ func TestSlotManagerInManager(t *testing.T) {
 
 	// task1 alloc resource success
 	require.Equal(t, 0, m.slotManager.available)
-	require.Equal(t, map[int64]*proto.Task{
-		taskID1: task1,
-	}, m.slotManager.executorSlotInfos)
-	ch <- struct{}{}
+	require.Equal(t, []*proto.Task{task1}, m.slotManager.executorSlotInfos)
+	ch <- nil
 
 	// task1 succeed
 	task1.State = proto.TaskStateSucceed
 	mockTaskTable.EXPECT().GetTaskByID(m.ctx, taskID1).Return(task1, nil)
 	mockInternalExecutor.EXPECT().Close()
+	wg.Wait()
+	require.Equal(t, 10, m.slotManager.available)
+	require.Equal(t, 0, len(m.slotManager.executorSlotInfos))
+	require.True(t, ctrl.Satisfied())
+
+	// ******** Test task occupation ********
+	task1.State = proto.TaskStateRunning
+	var (
+		taskID3 = int64(3)
+		task3   = &proto.Task{
+			ID:          taskID3,
+			State:       proto.TaskStateRunning,
+			Concurrency: 1,
+			Priority:    -1,
+			Step:        proto.StepOne,
+			Type:        "type",
+		}
+	)
+	// 1. task1 alloc success
+	mockTaskTable.EXPECT().HasSubtasksInStates(m.ctx, id, taskID1, proto.StepOne,
+		unfinishedSubtaskStates...).
+		Return(true, nil)
+	mockPool.EXPECT().Run(gomock.Any()).DoAndReturn(runFn)
+	mockTaskTable.EXPECT().HasSubtasksInStates(m.ctx, id, taskID2, proto.StepOne,
+		unfinishedSubtaskStates...).
+		Return(true, nil)
+
+	// mock inside onRunnableTask
+	mockInternalExecutor.EXPECT().Init(gomock.Any()).Return(nil)
+	mockTaskTable.EXPECT().GetTaskByID(m.ctx, taskID1).Return(task1, nil)
+	mockTaskTable.EXPECT().HasSubtasksInStates(m.ctx, id, taskID1, proto.StepOne,
+		unfinishedSubtaskStates...).
+		Return(true, nil)
+	// task1 start running
+	mockInternalExecutor.EXPECT().Run(gomock.Any(), task1).DoAndReturn(func(_ context.Context, _ *proto.Task) error {
+		return <-ch
+	})
+
+	m.onRunnableTasks([]*proto.Task{task1, task2})
+	time.Sleep(2 * time.Second)
+	// task1 alloc resource success
+	require.Equal(t, 0, m.slotManager.available)
+	require.Equal(t, []*proto.Task{task1}, m.slotManager.executorSlotInfos)
+	require.True(t, ctrl.Satisfied())
 
+	// 2. task1 is occupied by task3, task1 start to pausing
+	// 3. task3 is waiting for task1 to be released, and task2 can't be allocated
+	mockTaskTable.EXPECT().HasSubtasksInStates(m.ctx, id, taskID3, proto.StepOne,
+		unfinishedSubtaskStates...).
+		Return(true, nil)
+
+	// the priority of task3 is higher than task2, so task3 is in front of task2
+	m.onRunnableTasks([]*proto.Task{task3, task2})
+	time.Sleep(2 * time.Second)
+	require.Equal(t, 0, m.slotManager.available)
+	require.Equal(t, []*proto.Task{task1}, m.slotManager.executorSlotInfos)
+	require.True(t, ctrl.Satisfied())
+
+	mockTaskTable.EXPECT().GetTaskByID(m.ctx, taskID1).Return(task1, nil)
+	mockTaskTable.EXPECT().HasSubtasksInStates(m.ctx, id, taskID1, proto.StepOne,
+		unfinishedSubtaskStates...).
+		Return(true, nil)
+	mockInternalExecutor.EXPECT().Close()
+
+	// 4. task1 is released, task3 alloc success, start to run
+	ch <- context.Canceled
+	time.Sleep(time.Second)
+	require.Equal(t, 10, m.slotManager.available)
+	require.Len(t, m.slotManager.executorSlotInfos, 0)
+	require.True(t, ctrl.Satisfied())
+
+	mockTaskTable.EXPECT().HasSubtasksInStates(m.ctx, id, taskID3, proto.StepOne,
+		unfinishedSubtaskStates...).
+		Return(true, nil)
+	mockPool.EXPECT().Run(gomock.Any()).DoAndReturn(runFn)
+	mockTaskTable.EXPECT().HasSubtasksInStates(m.ctx, id, taskID1, proto.StepOne,
+		unfinishedSubtaskStates...).
+		Return(true, nil)
+	mockTaskTable.EXPECT().HasSubtasksInStates(m.ctx, id, taskID2, proto.StepOne,
+		unfinishedSubtaskStates...).
+		Return(true, nil)
+	mockPool.EXPECT().Run(gomock.Any()).DoAndReturn(runFn)
+
+	// mock inside onRunnableTask
+	mockInternalExecutor.EXPECT().Init(gomock.Any()).Return(nil)
+	mockTaskTable.EXPECT().GetTaskByID(m.ctx, taskID3).Return(task3, nil)
+	mockTaskTable.EXPECT().HasSubtasksInStates(m.ctx, id, taskID3, proto.StepOne,
+		unfinishedSubtaskStates...).
+		Return(true, nil)
+	mockInternalExecutor.EXPECT().Run(gomock.Any(), task3).DoAndReturn(func(_ context.Context, _ *proto.Task) error {
+		return <-ch
+	})
+
+	// 5. available is enough, task2 alloc success,
+	mockInternalExecutor.EXPECT().Init(gomock.Any()).Return(nil)
+	mockTaskTable.EXPECT().GetTaskByID(m.ctx, taskID2).Return(task2, nil)
+	mockTaskTable.EXPECT().HasSubtasksInStates(m.ctx, id, taskID2, proto.StepOne,
+		unfinishedSubtaskStates...).
+		Return(true, nil)
+	mockInternalExecutor.EXPECT().Run(gomock.Any(), task2).DoAndReturn(func(_ context.Context, _ *proto.Task) error {
+		return <-ch
+	})
+
+	m.onRunnableTasks([]*proto.Task{task3, task1, task2})
+	time.Sleep(2 * time.Second)
+	require.Equal(t, 8, m.slotManager.available)
+	require.Equal(t, 2, len(m.slotManager.executorSlotInfos))
+	require.True(t, ctrl.Satisfied())
+
+	// 6. task3/task2 run success
+	task3.State = proto.TaskStateSucceed
+	mockTaskTable.EXPECT().GetTaskByID(m.ctx, taskID3).Return(task3, nil)
+	mockInternalExecutor.EXPECT().Close()
+	task2.State = proto.TaskStateSucceed
+	mockTaskTable.EXPECT().GetTaskByID(m.ctx, taskID2).Return(task2, nil)
+	mockInternalExecutor.EXPECT().Close()
+	ch <- nil
+	ch <- nil
 	wg.Wait()
 	require.Equal(t, 10, m.slotManager.available)
 	require.Equal(t, 0, len(m.slotManager.executorSlotInfos))
+	require.True(t, ctrl.Satisfied())
 }

From eee0420fa0c9207f3bdee8a9fe974e0ee2a6d070 Mon Sep 17 00:00:00 2001
From: okJiang <jiangxianjie@pingcap.com>
Date: Tue, 26 Dec 2023 16:51:57 +0800
Subject: [PATCH 4/9] Update pkg/disttask/framework/taskexecutor/manager.go

Co-authored-by: D3Hunter <jujj603@gmail.com>
---
 pkg/disttask/framework/taskexecutor/manager.go | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/pkg/disttask/framework/taskexecutor/manager.go b/pkg/disttask/framework/taskexecutor/manager.go
index a4cc1778a9f25..e9785aa7fede5 100644
--- a/pkg/disttask/framework/taskexecutor/manager.go
+++ b/pkg/disttask/framework/taskexecutor/manager.go
@@ -222,7 +222,7 @@ func (m *Manager) onRunnableTasks(tasks []*proto.Task) {
 		logutil.Logger(m.logCtx).Info("detect new subtask", zap.Int64("task-id", task.ID))
 
 		canAlloc, tasksNeedFree := m.slotManager.canAlloc(task)
-		if tasksNeedFree != nil {
+		if len(tasksNeedFree) > 0 {
 			m.cancelTasks(tasksNeedFree)
 			// do not handle the tasks with lower priority if current task is waiting tasks free.
 			break

From 8cff09a849e39311359c0538ea587f980055b890 Mon Sep 17 00:00:00 2001
From: okJiang <819421878@qq.com>
Date: Wed, 27 Dec 2023 14:11:13 +0800
Subject: [PATCH 5/9] fix comment

---
 .../framework/taskexecutor/manager.go         | 12 +--
 .../framework/taskexecutor/manager_test.go    | 16 ++--
 pkg/disttask/framework/taskexecutor/slot.go   | 35 ++++-----
 .../framework/taskexecutor/slot_test.go       | 73 ++++++++++++-------
 4 files changed, 80 insertions(+), 56 deletions(-)

diff --git a/pkg/disttask/framework/taskexecutor/manager.go b/pkg/disttask/framework/taskexecutor/manager.go
index a4cc1778a9f25..4273294d3a80e 100644
--- a/pkg/disttask/framework/taskexecutor/manager.go
+++ b/pkg/disttask/framework/taskexecutor/manager.go
@@ -95,9 +95,9 @@ func (b *ManagerBuilder) BuildManager(ctx context.Context, id string, taskTable
 		logCtx:    logutil.WithFields(context.Background()),
 		newPool:   b.newPool,
 		slotManager: &slotManager{
-			taskID2SlotIndex:  make(map[int64]int),
-			executorSlotInfos: make([]*proto.Task, 0),
-			available:         cpu.GetCPUCount(),
+			taskID2Index:  make(map[int64]int),
+			executorTasks: make([]*proto.Task, 0),
+			available:     cpu.GetCPUCount(),
 		},
 	}
 	m.ctx, m.cancel = context.WithCancel(ctx)
@@ -223,13 +223,13 @@ func (m *Manager) onRunnableTasks(tasks []*proto.Task) {
 
 		canAlloc, tasksNeedFree := m.slotManager.canAlloc(task)
 		if tasksNeedFree != nil {
-			m.cancelTasks(tasksNeedFree)
+			m.cancelTaskExecutors(tasksNeedFree)
 			// do not handle the tasks with lower priority if current task is waiting tasks free.
 			break
 		}
 
 		if !canAlloc {
-			logutil.Logger(m.logCtx).Warn("subtask has been rejected", zap.Int64("task-id", task.ID))
+			logutil.Logger(m.logCtx).Debug("subtask has been rejected", zap.Int64("task-id", task.ID))
 			continue
 		}
 		m.addHandlingTask(task.ID)
@@ -323,7 +323,7 @@ func (m *Manager) cancelAllRunningTasks() {
 	}
 }
 
-func (m *Manager) cancelTasks(tasks []*proto.Task) {
+func (m *Manager) cancelTaskExecutors(tasks []*proto.Task) {
 	m.mu.RLock()
 	defer m.mu.RUnlock()
 	for _, task := range tasks {
diff --git a/pkg/disttask/framework/taskexecutor/manager_test.go b/pkg/disttask/framework/taskexecutor/manager_test.go
index 0d03b49e001c4..502a88839a7cf 100644
--- a/pkg/disttask/framework/taskexecutor/manager_test.go
+++ b/pkg/disttask/framework/taskexecutor/manager_test.go
@@ -76,7 +76,7 @@ func TestManageTask(t *testing.T) {
 	m.addHandlingTask(2)
 	ctx1, cancel1 = context.WithCancelCause(context.Background())
 	m.registerCancelFunc(2, cancel1)
-	m.cancelTasks([]*proto.Task{{ID: 2}})
+	m.cancelTaskExecutors([]*proto.Task{{ID: 2}})
 	require.Equal(t, context.Canceled, ctx1.Err())
 
 	// test cancel.
@@ -325,7 +325,7 @@ func TestSlotManagerInManager(t *testing.T) {
 
 	// task1 alloc resource success
 	require.Equal(t, 0, m.slotManager.available)
-	require.Equal(t, []*proto.Task{task1}, m.slotManager.executorSlotInfos)
+	require.Equal(t, []*proto.Task{task1}, m.slotManager.executorTasks)
 	ch <- nil
 
 	// task1 succeed
@@ -334,7 +334,7 @@ func TestSlotManagerInManager(t *testing.T) {
 	mockInternalExecutor.EXPECT().Close()
 	wg.Wait()
 	require.Equal(t, 10, m.slotManager.available)
-	require.Equal(t, 0, len(m.slotManager.executorSlotInfos))
+	require.Equal(t, 0, len(m.slotManager.executorTasks))
 	require.True(t, ctrl.Satisfied())
 
 	// ******** Test task occupation ********
@@ -374,7 +374,7 @@ func TestSlotManagerInManager(t *testing.T) {
 	time.Sleep(2 * time.Second)
 	// task1 alloc resource success
 	require.Equal(t, 0, m.slotManager.available)
-	require.Equal(t, []*proto.Task{task1}, m.slotManager.executorSlotInfos)
+	require.Equal(t, []*proto.Task{task1}, m.slotManager.executorTasks)
 	require.True(t, ctrl.Satisfied())
 
 	// 2. task1 is occupied by task3, task1 start to pausing
@@ -387,7 +387,7 @@ func TestSlotManagerInManager(t *testing.T) {
 	m.onRunnableTasks([]*proto.Task{task3, task2})
 	time.Sleep(2 * time.Second)
 	require.Equal(t, 0, m.slotManager.available)
-	require.Equal(t, []*proto.Task{task1}, m.slotManager.executorSlotInfos)
+	require.Equal(t, []*proto.Task{task1}, m.slotManager.executorTasks)
 	require.True(t, ctrl.Satisfied())
 
 	mockTaskTable.EXPECT().GetTaskByID(m.ctx, taskID1).Return(task1, nil)
@@ -400,7 +400,7 @@ func TestSlotManagerInManager(t *testing.T) {
 	ch <- context.Canceled
 	time.Sleep(time.Second)
 	require.Equal(t, 10, m.slotManager.available)
-	require.Len(t, m.slotManager.executorSlotInfos, 0)
+	require.Len(t, m.slotManager.executorTasks, 0)
 	require.True(t, ctrl.Satisfied())
 
 	mockTaskTable.EXPECT().HasSubtasksInStates(m.ctx, id, taskID3, proto.StepOne,
@@ -438,7 +438,7 @@ func TestSlotManagerInManager(t *testing.T) {
 	m.onRunnableTasks([]*proto.Task{task3, task1, task2})
 	time.Sleep(2 * time.Second)
 	require.Equal(t, 8, m.slotManager.available)
-	require.Equal(t, 2, len(m.slotManager.executorSlotInfos))
+	require.Equal(t, 2, len(m.slotManager.executorTasks))
 	require.True(t, ctrl.Satisfied())
 
 	// 6. task3/task2 run success
@@ -452,6 +452,6 @@ func TestSlotManagerInManager(t *testing.T) {
 	ch <- nil
 	wg.Wait()
 	require.Equal(t, 10, m.slotManager.available)
-	require.Equal(t, 0, len(m.slotManager.executorSlotInfos))
+	require.Equal(t, 0, len(m.slotManager.executorTasks))
 	require.True(t, ctrl.Satisfied())
 }
diff --git a/pkg/disttask/framework/taskexecutor/slot.go b/pkg/disttask/framework/taskexecutor/slot.go
index 9e70adf802438..6f78f6e0d144b 100644
--- a/pkg/disttask/framework/taskexecutor/slot.go
+++ b/pkg/disttask/framework/taskexecutor/slot.go
@@ -24,13 +24,14 @@ import (
 // slotManager is used to manage the slots of the executor.
 type slotManager struct {
 	sync.RWMutex
-	// slotIndex is the index of the task
-	taskID2SlotIndex map[int64]int
-	// executorSlotInfos is used to record the task that is running on the executor.
-	executorSlotInfos []*proto.Task
+	// taskID2Index is the index of the task
+	taskID2Index map[int64]int
+	// executorTasks is used to record the tasks that is running on the executor,
+	// the slice is sorted in reverse task order.
+	executorTasks []*proto.Task
 
 	// The number of slots that can be used by the executor.
-	// It is always equal to CPU cores of the instance.
+	// Its initial value is always equal to CPU cores of the instance.
 	available int
 }
 
@@ -38,12 +39,12 @@ func (sm *slotManager) alloc(task *proto.Task) {
 	sm.Lock()
 	defer sm.Unlock()
 
-	sm.executorSlotInfos = append(sm.executorSlotInfos, task)
-	slices.SortFunc(sm.executorSlotInfos, func(a, b *proto.Task) int {
+	sm.executorTasks = append(sm.executorTasks, task)
+	slices.SortFunc(sm.executorTasks, func(a, b *proto.Task) int {
 		return b.Compare(a)
 	})
-	for index, slotInfo := range sm.executorSlotInfos {
-		sm.taskID2SlotIndex[slotInfo.ID] = index
+	for index, slotInfo := range sm.executorTasks {
+		sm.taskID2Index[slotInfo.ID] = index
 	}
 	sm.available -= task.Concurrency
 }
@@ -52,16 +53,16 @@ func (sm *slotManager) free(taskID int64) {
 	sm.Lock()
 	defer sm.Unlock()
 
-	index, ok := sm.taskID2SlotIndex[taskID]
+	index, ok := sm.taskID2Index[taskID]
 	if !ok {
 		return
 	}
-	sm.available += sm.executorSlotInfos[index].Concurrency
-	sm.executorSlotInfos = append(sm.executorSlotInfos[:index], sm.executorSlotInfos[index+1:]...)
+	sm.available += sm.executorTasks[index].Concurrency
+	sm.executorTasks = append(sm.executorTasks[:index], sm.executorTasks[index+1:]...)
 
-	delete(sm.taskID2SlotIndex, taskID)
-	for index, slotInfo := range sm.executorSlotInfos {
-		sm.taskID2SlotIndex[slotInfo.ID] = index
+	delete(sm.taskID2Index, taskID)
+	for index, slotInfo := range sm.executorTasks {
+		sm.taskID2Index[slotInfo.ID] = index
 	}
 }
 
@@ -75,9 +76,9 @@ func (sm *slotManager) canAlloc(task *proto.Task) (canAlloc bool, tasksNeedFree
 	}
 
 	usedSlots := 0
-	for _, slotInfo := range sm.executorSlotInfos {
+	for _, slotInfo := range sm.executorTasks {
 		if slotInfo.Compare(task) < 0 {
-			continue
+			break
 		}
 		tasksNeedFree = append(tasksNeedFree, slotInfo)
 		usedSlots += slotInfo.Concurrency
diff --git a/pkg/disttask/framework/taskexecutor/slot_test.go b/pkg/disttask/framework/taskexecutor/slot_test.go
index 237331970011a..35b235834f508 100644
--- a/pkg/disttask/framework/taskexecutor/slot_test.go
+++ b/pkg/disttask/framework/taskexecutor/slot_test.go
@@ -23,22 +23,19 @@ import (
 
 func TestSlotManager(t *testing.T) {
 	sm := slotManager{
-		taskID2SlotIndex:  make(map[int64]int),
-		executorSlotInfos: make([]*proto.Task, 0),
-		available:         10,
+		taskID2Index:  make(map[int64]int),
+		executorTasks: make([]*proto.Task, 0),
+		available:     10,
 	}
 
 	var (
-		taskID  = int64(1)
-		taskID2 = int64(2)
-		taskID3 = int64(3)
-		task    = &proto.Task{
-			ID:          taskID,
+		task = &proto.Task{
+			ID:          1,
 			Priority:    1,
 			Concurrency: 1,
 		}
 		task2 = &proto.Task{
-			ID:          taskID2,
+			ID:          2,
 			Priority:    2,
 			Concurrency: 10,
 		}
@@ -48,8 +45,8 @@ func TestSlotManager(t *testing.T) {
 	require.True(t, canAlloc)
 	require.Nil(t, tasksNeedFree)
 	sm.alloc(task)
-	require.Len(t, sm.executorSlotInfos, 1)
-	require.Equal(t, task, sm.executorSlotInfos[sm.taskID2SlotIndex[taskID]])
+	require.Len(t, sm.executorTasks, 1)
+	require.Equal(t, task, sm.executorTasks[sm.taskID2Index[task.ID]])
 	require.Equal(t, 9, sm.available)
 
 	// the available slots is not enough for task2
@@ -65,7 +62,7 @@ func TestSlotManager(t *testing.T) {
 
 	// task with higher priority
 	task3 := &proto.Task{
-		ID:          taskID3,
+		ID:          3,
 		Priority:    -1,
 		Concurrency: 1,
 	}
@@ -75,27 +72,53 @@ func TestSlotManager(t *testing.T) {
 	require.Nil(t, tasksNeedFree)
 	// task2 is occupied by task3
 	sm.alloc(task3)
-	require.Len(t, sm.executorSlotInfos, 2)
-	require.Equal(t, task3, sm.executorSlotInfos[sm.taskID2SlotIndex[taskID3]])
+	require.Len(t, sm.executorTasks, 2)
+	require.Equal(t, task3, sm.executorTasks[sm.taskID2Index[task3.ID]])
 	require.Equal(t, 8, sm.available)
-	sm.free(taskID3)
-	require.Len(t, sm.executorSlotInfos, 1)
-	require.Equal(t, task, sm.executorSlotInfos[sm.taskID2SlotIndex[taskID]])
+
+	task4 := &proto.Task{
+		ID:          4,
+		Priority:    1,
+		Concurrency: 1,
+	}
+	canAlloc, tasksNeedFree = sm.canAlloc(task4)
+	require.True(t, canAlloc)
+	require.Nil(t, tasksNeedFree)
+	sm.alloc(task4)
+	task5 := &proto.Task{
+		ID:          5,
+		Priority:    0,
+		Concurrency: 1,
+	}
+	canAlloc, tasksNeedFree = sm.canAlloc(task5)
+	require.True(t, canAlloc)
+	require.Nil(t, tasksNeedFree)
+	sm.alloc(task5)
+	require.Len(t, sm.executorTasks, 4)
+	// test the order of executorTasks
+	require.Equal(t, []*proto.Task{task4, task, task5, task3}, sm.executorTasks)
+	require.Equal(t, 6, sm.available)
+	sm.free(task4.ID)
+	sm.free(task5.ID)
+
+	sm.free(task3.ID)
+	require.Len(t, sm.executorTasks, 1)
+	require.Equal(t, task, sm.executorTasks[sm.taskID2Index[task.ID]])
 
 	// task2 is waiting for allocation again
 	canAlloc, tasksNeedFree = sm.canAlloc(task2)
 	require.True(t, canAlloc)
 	require.Equal(t, []*proto.Task{task}, tasksNeedFree)
 
-	sm.free(taskID)
-	require.Len(t, sm.executorSlotInfos, 0)
-	require.Len(t, sm.taskID2SlotIndex, 0)
+	sm.free(task.ID)
+	require.Len(t, sm.executorTasks, 0)
+	require.Len(t, sm.taskID2Index, 0)
 
 	sm.alloc(task2)
-	require.Len(t, sm.executorSlotInfos, 1)
-	require.Equal(t, task2, sm.executorSlotInfos[sm.taskID2SlotIndex[taskID2]])
+	require.Len(t, sm.executorTasks, 1)
+	require.Equal(t, task2, sm.executorTasks[sm.taskID2Index[task2.ID]])
 	require.Equal(t, 0, sm.available)
-	sm.free(taskID2)
-	require.Len(t, sm.executorSlotInfos, 0)
-	require.Len(t, sm.taskID2SlotIndex, 0)
+	sm.free(task2.ID)
+	require.Len(t, sm.executorTasks, 0)
+	require.Len(t, sm.taskID2Index, 0)
 }

From 728629cd5f4057cb703b8a8c68a38a41635992c4 Mon Sep 17 00:00:00 2001
From: okJiang <819421878@qq.com>
Date: Wed, 27 Dec 2023 14:18:39 +0800
Subject: [PATCH 6/9] fix comment: add test case

---
 pkg/disttask/framework/taskexecutor/slot_test.go | 13 +++++++++++++
 1 file changed, 13 insertions(+)

diff --git a/pkg/disttask/framework/taskexecutor/slot_test.go b/pkg/disttask/framework/taskexecutor/slot_test.go
index 35b235834f508..d2c57d928e618 100644
--- a/pkg/disttask/framework/taskexecutor/slot_test.go
+++ b/pkg/disttask/framework/taskexecutor/slot_test.go
@@ -98,6 +98,19 @@ func TestSlotManager(t *testing.T) {
 	// test the order of executorTasks
 	require.Equal(t, []*proto.Task{task4, task, task5, task3}, sm.executorTasks)
 	require.Equal(t, 6, sm.available)
+
+	task6 := &proto.Task{
+		ID:          6,
+		Priority:    0,
+		Concurrency: 8,
+	}
+	canAlloc, tasksNeedFree = sm.canAlloc(task6)
+	require.True(t, canAlloc)
+	require.Equal(t, []*proto.Task{task4, task}, tasksNeedFree)
+	task6.Concurrency++
+	canAlloc, tasksNeedFree = sm.canAlloc(task6)
+	require.False(t, canAlloc)
+	require.Nil(t, tasksNeedFree)
 	sm.free(task4.ID)
 	sm.free(task5.ID)
 

From 4b140b90201266c6e8925298e01c454bb55f142d Mon Sep 17 00:00:00 2001
From: okJiang <819421878@qq.com>
Date: Thu, 28 Dec 2023 15:24:28 +0800
Subject: [PATCH 7/9] fix comment: optimize unit test

---
 .../framework/taskexecutor/manager_test.go    | 34 ++++++++++++-------
 1 file changed, 21 insertions(+), 13 deletions(-)

diff --git a/pkg/disttask/framework/taskexecutor/manager_test.go b/pkg/disttask/framework/taskexecutor/manager_test.go
index 502a88839a7cf..e1cc6f4e7ce90 100644
--- a/pkg/disttask/framework/taskexecutor/manager_test.go
+++ b/pkg/disttask/framework/taskexecutor/manager_test.go
@@ -321,11 +321,14 @@ func TestSlotManagerInManager(t *testing.T) {
 	})
 
 	m.onRunnableTasks([]*proto.Task{task1, task2})
-	time.Sleep(2 * time.Second)
-
 	// task1 alloc resource success
-	require.Equal(t, 0, m.slotManager.available)
-	require.Equal(t, []*proto.Task{task1}, m.slotManager.executorTasks)
+	require.Eventually(t, func() bool {
+		if m.slotManager.available != 0 || len(m.slotManager.executorTasks) != 1 ||
+			m.slotManager.executorTasks[0].ID != task1.ID {
+			return false
+		}
+		return ctrl.Satisfied()
+	}, 2*time.Second, 300*time.Millisecond)
 	ch <- nil
 
 	// task1 succeed
@@ -371,11 +374,14 @@ func TestSlotManagerInManager(t *testing.T) {
 	})
 
 	m.onRunnableTasks([]*proto.Task{task1, task2})
-	time.Sleep(2 * time.Second)
 	// task1 alloc resource success
-	require.Equal(t, 0, m.slotManager.available)
-	require.Equal(t, []*proto.Task{task1}, m.slotManager.executorTasks)
-	require.True(t, ctrl.Satisfied())
+	require.Eventually(t, func() bool {
+		if m.slotManager.available != 0 || len(m.slotManager.executorTasks) != 1 ||
+			m.slotManager.executorTasks[0].ID != task1.ID {
+			return false
+		}
+		return ctrl.Satisfied()
+	}, 2*time.Second, 300*time.Millisecond)
 
 	// 2. task1 is occupied by task3, task1 start to pausing
 	// 3. task3 is waiting for task1 to be released, and task2 can't be allocated
@@ -385,7 +391,6 @@ func TestSlotManagerInManager(t *testing.T) {
 
 	// the priority of task3 is higher than task2, so task3 is in front of task2
 	m.onRunnableTasks([]*proto.Task{task3, task2})
-	time.Sleep(2 * time.Second)
 	require.Equal(t, 0, m.slotManager.available)
 	require.Equal(t, []*proto.Task{task1}, m.slotManager.executorTasks)
 	require.True(t, ctrl.Satisfied())
@@ -398,7 +403,7 @@ func TestSlotManagerInManager(t *testing.T) {
 
 	// 4. task1 is released, task3 alloc success, start to run
 	ch <- context.Canceled
-	time.Sleep(time.Second)
+	wg.Wait()
 	require.Equal(t, 10, m.slotManager.available)
 	require.Len(t, m.slotManager.executorTasks, 0)
 	require.True(t, ctrl.Satisfied())
@@ -437,9 +442,12 @@ func TestSlotManagerInManager(t *testing.T) {
 
 	m.onRunnableTasks([]*proto.Task{task3, task1, task2})
 	time.Sleep(2 * time.Second)
-	require.Equal(t, 8, m.slotManager.available)
-	require.Equal(t, 2, len(m.slotManager.executorTasks))
-	require.True(t, ctrl.Satisfied())
+	require.Eventually(t, func() bool {
+		if m.slotManager.available != 8 || len(m.slotManager.executorTasks) != 2 {
+			return false
+		}
+		return ctrl.Satisfied()
+	}, 2*time.Second, 300*time.Millisecond)
 
 	// 6. task3/task2 run success
 	task3.State = proto.TaskStateSucceed

From 1109df9654df2a2327b2b015bb80370e9c8c9d6a Mon Sep 17 00:00:00 2001
From: okJiang <jiangxianjie@pingcap.com>
Date: Thu, 28 Dec 2023 15:55:25 +0800
Subject: [PATCH 8/9] Update pkg/disttask/framework/taskexecutor/manager.go

---
 pkg/disttask/framework/taskexecutor/manager.go | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/pkg/disttask/framework/taskexecutor/manager.go b/pkg/disttask/framework/taskexecutor/manager.go
index 91fce70541a23..f80e418f756d4 100644
--- a/pkg/disttask/framework/taskexecutor/manager.go
+++ b/pkg/disttask/framework/taskexecutor/manager.go
@@ -421,7 +421,7 @@ func (m *Manager) onRunnableTask(task *proto.Task) {
 		switch task.State {
 		case proto.TaskStateRunning:
 			if taskCtx.Err() != nil {
-				logutil.Logger(m.logCtx).Info("onRunnableTask exit for taskCtx.Done",
+				logutil.Logger(m.logCtx).Debug("onRunnableTask exit for taskCtx.Done",
 					zap.Int64("task-id", task.ID), zap.Stringer("type", task.Type), zap.Error(taskCtx.Err()))
 				return
 			}

From 47a7e05027d6917019da7a97f64ae8b12e3f7b35 Mon Sep 17 00:00:00 2001
From: okJiang <jiangxianjie@pingcap.com>
Date: Thu, 28 Dec 2023 16:17:46 +0800
Subject: [PATCH 9/9] Apply suggestions from code review

Co-authored-by: D3Hunter <jujj603@gmail.com>
---
 pkg/disttask/framework/taskexecutor/manager.go      | 4 +---
 pkg/disttask/framework/taskexecutor/manager_test.go | 2 +-
 2 files changed, 2 insertions(+), 4 deletions(-)

diff --git a/pkg/disttask/framework/taskexecutor/manager.go b/pkg/disttask/framework/taskexecutor/manager.go
index f80e418f756d4..049623186b614 100644
--- a/pkg/disttask/framework/taskexecutor/manager.go
+++ b/pkg/disttask/framework/taskexecutor/manager.go
@@ -229,7 +229,7 @@ func (m *Manager) onRunnableTasks(tasks []*proto.Task) {
 		}
 
 		if !canAlloc {
-			logutil.Logger(m.logCtx).Debug("subtask has been rejected", zap.Int64("task-id", task.ID))
+			logutil.Logger(m.logCtx).Debug("no enough slots to run task", zap.Int64("task-id", task.ID))
 			continue
 		}
 		m.addHandlingTask(task.ID)
@@ -421,8 +421,6 @@ func (m *Manager) onRunnableTask(task *proto.Task) {
 		switch task.State {
 		case proto.TaskStateRunning:
 			if taskCtx.Err() != nil {
-				logutil.Logger(m.logCtx).Debug("onRunnableTask exit for taskCtx.Done",
-					zap.Int64("task-id", task.ID), zap.Stringer("type", task.Type), zap.Error(taskCtx.Err()))
 				return
 			}
 			// use taskCtx for canceling.
diff --git a/pkg/disttask/framework/taskexecutor/manager_test.go b/pkg/disttask/framework/taskexecutor/manager_test.go
index e1cc6f4e7ce90..17b7965a19748 100644
--- a/pkg/disttask/framework/taskexecutor/manager_test.go
+++ b/pkg/disttask/framework/taskexecutor/manager_test.go
@@ -383,7 +383,7 @@ func TestSlotManagerInManager(t *testing.T) {
 		return ctrl.Satisfied()
 	}, 2*time.Second, 300*time.Millisecond)
 
-	// 2. task1 is occupied by task3, task1 start to pausing
+	// 2. task1 is preempted by task3, task1 start to pausing
 	// 3. task3 is waiting for task1 to be released, and task2 can't be allocated
 	mockTaskTable.EXPECT().HasSubtasksInStates(m.ctx, id, taskID3, proto.StepOne,
 		unfinishedSubtaskStates...).