Skip to content

Commit

Permalink
resourcemange: fix delete more workers than expected (#40894)
Browse files Browse the repository at this point in the history
close #40893
  • Loading branch information
hawkingrei authored Feb 2, 2023
1 parent e8c32ae commit 337af61
Show file tree
Hide file tree
Showing 6 changed files with 56 additions and 20 deletions.
7 changes: 6 additions & 1 deletion resourcemanager/pooltask/task_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ type TaskManager[T any, U any, C any, CT any, TF Context[CT]] struct {
concurrency int32
}

// NewTaskManager create a new pooltask manager.
// NewTaskManager create a new pool task manager.
func NewTaskManager[T any, U any, C any, CT any, TF Context[CT]](c int32) TaskManager[T, U, C, CT, TF] {
task := make([]TaskStatusContainer[T, U, C, CT, TF], shard)
for i := 0; i < shard; i++ {
Expand Down Expand Up @@ -148,3 +148,8 @@ func (t *TaskManager[T, U, C, CT, TF]) StopTask(taskID uint64) {
}
}
}

// GetOriginConcurrency return the concurrency of the pool at the init.
func (t *TaskManager[T, U, C, CT, TF]) GetOriginConcurrency() int32 {
return t.concurrency
}
13 changes: 8 additions & 5 deletions resourcemanager/pooltask/task_manager_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,17 @@
package pooltask

// Overclock is to increase the concurrency of pool.
func (t *TaskManager[T, U, C, CT, TF]) Overclock() (tid uint64, task *TaskBox[T, U, C, CT, TF]) {
if t.concurrency > t.running.Load() {
return t.getBoostTask()
func (t *TaskManager[T, U, C, CT, TF]) Overclock(capacity int) (tid uint64, task *TaskBox[T, U, C, CT, TF]) {
if t.running.Load() >= int32(capacity) {
return
}
return 0, nil
return t.getBoostTask()
}

// Downclock is to decrease the concurrency of pool.
func (t *TaskManager[T, U, C, CT, TF]) Downclock() {
func (t *TaskManager[T, U, C, CT, TF]) Downclock(capacity int) {
if t.running.Load() <= int32(capacity) {
return
}
t.pauseTask()
}
3 changes: 3 additions & 0 deletions resourcemanager/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ import (
var (
// MinSchedulerInterval is the minimum interval between two scheduling.
MinSchedulerInterval = atomic.NewDuration(200 * time.Millisecond)

// MaxOverclockCount is the maximum number of overclock goroutine.
MaxOverclockCount = 1
)

// GoroutinePool is a pool interface
Expand Down
12 changes: 8 additions & 4 deletions util/gpool/spmc/spmcpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,14 +137,18 @@ func (p *Pool[T, U, C, CT, TF]) Tune(size int) {
if capacity == -1 || size <= 0 || size == capacity {
return
}
if p.taskManager.GetOriginConcurrency()+int32(util.MaxOverclockCount) < int32(size) {
return
}
p.SetLastTuneTs(time.Now())
p.capacity.Store(int32(size))
if size > capacity {
for i := 0; i < size-capacity; i++ {
if tid, boostTask := p.taskManager.Overclock(); boostTask != nil {
if tid, boostTask := p.taskManager.Overclock(size); boostTask != nil {
p.addWaitingTask()
p.taskManager.AddSubTask(tid, boostTask.Clone())
p.taskCh <- boostTask
newTask := boostTask.Clone()
p.taskManager.AddSubTask(tid, newTask)
p.taskCh <- newTask
}
}
if size-capacity == 1 {
Expand All @@ -155,7 +159,7 @@ func (p *Pool[T, U, C, CT, TF]) Tune(size int) {
return
}
if size < capacity {
p.taskManager.Downclock()
p.taskManager.Downclock(size)
}
}

Expand Down
39 changes: 30 additions & 9 deletions util/gpool/spmc/spmcpool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,17 +175,22 @@ func testTunePool(t *testing.T, name string) {
}
}()
time.Sleep(1 * time.Second)
newSize := pool.Cap() - 1
pool.Tune(newSize)
time.Sleep(1 * time.Second)
require.Equal(t, newSize, pool.Cap())
require.Equal(t, int32(newSize), pool.taskManager.Running(tid))
downclockPool(t, pool, tid)
overclockPool(t, pool, tid)

newSize = pool.Cap() + 1
pool.Tune(newSize)
// at Overclock mode
overclockPool(t, pool, tid)

// Overclock mode, But it is invalid. It should keep the same size.
size := pool.Cap()
pool.Tune(pool.Cap() + 1)
time.Sleep(1 * time.Second)
require.Equal(t, newSize, pool.Cap())
require.Equal(t, int32(newSize), pool.taskManager.Running(tid))
require.Equal(t, size, pool.Cap())
require.Equal(t, int32(size), pool.taskManager.Running(tid))

for n := pool.Cap(); n > 1; n-- {
downclockPool(t, pool, tid)
}

// exit test
close(exit)
Expand All @@ -195,6 +200,22 @@ func testTunePool(t *testing.T, name string) {
pool.ReleaseAndWait()
}

func overclockPool[T any, U any, C any, CT any, TF pooltask.Context[CT]](t *testing.T, pool *Pool[T, U, C, CT, TF], tid uint64) {
newSize := pool.Cap() + 1
pool.Tune(newSize)
time.Sleep(1 * time.Second)
require.Equal(t, newSize, pool.Cap())
require.Equal(t, int32(newSize), pool.taskManager.Running(tid))
}

func downclockPool[T any, U any, C any, CT any, TF pooltask.Context[CT]](t *testing.T, pool *Pool[T, U, C, CT, TF], tid uint64) {
newSize := pool.Cap() - 1
pool.Tune(newSize)
time.Sleep(1 * time.Second)
require.Equal(t, newSize, pool.Cap())
require.Equal(t, int32(newSize), pool.taskManager.Running(tid))
}

func TestPoolWithEnoughCapacity(t *testing.T) {
const (
RunTimes = 1000
Expand Down
2 changes: 1 addition & 1 deletion util/gpool/spmc/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ func (w *goWorker[T, U, C, CT, TF]) run() {
f.GetResultCh() <- w.pool.consumerFunc(t.Task, f.ConstArgs(), ctx)
f.Done()
}
w.pool.ExitSubTask(f.TaskID())
}
w.pool.ExitSubTask(f.TaskID())
f.Finish()
if ok := w.pool.revertWorker(w); !ok {
return
Expand Down

0 comments on commit 337af61

Please sign in to comment.