Skip to content

Commit

Permalink
fix comment
Browse files Browse the repository at this point in the history
  • Loading branch information
okJiang committed Dec 19, 2023
1 parent 3781105 commit 6392faf
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 14 deletions.
9 changes: 4 additions & 5 deletions pkg/disttask/framework/taskexecutor/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func (b *ManagerBuilder) BuildManager(ctx context.Context, id string, taskTable
logCtx: logutil.WithFields(context.Background()),
newPool: b.newPool,
slotManager: &slotManager{
executorSlotInfos: make(map[int64]*slotInfo, 0),
executorSlotInfos: make(map[int64]*slotInfo),
available: cpu.GetCPUCount(),
},
}
Expand Down Expand Up @@ -222,7 +222,7 @@ func (m *Manager) onRunnableTasks(tasks []*proto.Task) {
}
logutil.Logger(m.logCtx).Info("detect new subtask", zap.Int64("task-id", task.ID))

if !m.slotManager.canReserve(task) {
if !m.slotManager.canAlloc(task) {
failpoint.Inject("taskTick", func() {
<-onRunnableTasksTick
})
Expand All @@ -231,14 +231,13 @@ func (m *Manager) onRunnableTasks(tasks []*proto.Task) {
m.addHandlingTask(task.ID)
t := task
err = m.executorPool.Run(func() {
m.slotManager.reserve(t)
defer m.slotManager.unReserve(t.ID)
m.slotManager.alloc(t)
defer m.slotManager.free(t.ID)
m.onRunnableTask(t)
m.removeHandlingTask(t.ID)
})
// pool closed.
if err != nil {
m.slotManager.unReserve(t.ID)
m.removeHandlingTask(task.ID)
m.logErr(err)
return
Expand Down
10 changes: 5 additions & 5 deletions pkg/disttask/framework/taskexecutor/slot.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type slotInfo struct {
slotCount int
}

func (sm *slotManager) reserve(task *proto.Task) {
func (sm *slotManager) alloc(task *proto.Task) {
sm.Lock()
defer sm.Unlock()
sm.executorSlotInfos[task.ID] = &slotInfo{
Expand All @@ -50,7 +50,7 @@ func (sm *slotManager) reserve(task *proto.Task) {
sm.available -= task.Concurrency
}

func (sm *slotManager) unReserve(taskID int64) {
func (sm *slotManager) free(taskID int64) {
sm.Lock()
defer sm.Unlock()

Expand All @@ -62,9 +62,9 @@ func (sm *slotManager) unReserve(taskID int64) {
}

// canReserve is used to check whether the instance has enough slots to run the task.
func (sm *slotManager) canReserve(task *proto.Task) bool {
sm.Lock()
defer sm.Unlock()
func (sm *slotManager) canAlloc(task *proto.Task) bool {
sm.RLock()
defer sm.RUnlock()

return sm.available >= task.Concurrency
}
8 changes: 4 additions & 4 deletions pkg/disttask/framework/taskexecutor/slot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,18 @@ func TestSlotManager(t *testing.T) {
Priority: 1,
Concurrency: 1,
}
require.True(t, sm.canReserve(task))
sm.reserve(task)
require.True(t, sm.canAlloc(task))
sm.alloc(task)
require.Equal(t, 1, sm.executorSlotInfos[taskID].priority)
require.Equal(t, 1, sm.executorSlotInfos[taskID].slotCount)
require.Equal(t, 9, sm.available)

require.False(t, sm.canReserve(&proto.Task{
require.False(t, sm.canAlloc(&proto.Task{
ID: taskID2,
Priority: 2,
Concurrency: 10,
}))

sm.unReserve(taskID)
sm.free(taskID)
require.Nil(t, sm.executorSlotInfos[taskID])
}

0 comments on commit 6392faf

Please sign in to comment.