From 337af61d95211262423a98ed6054737df59cf889 Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Thu, 2 Feb 2023 13:33:55 +0800 Subject: [PATCH] resourcemange: fix delete more workers than expected (#40894) close pingcap/tidb#40893 --- resourcemanager/pooltask/task_manager.go | 7 +++- .../pooltask/task_manager_scheduler.go | 13 ++++--- resourcemanager/util/util.go | 3 ++ util/gpool/spmc/spmcpool.go | 12 ++++-- util/gpool/spmc/spmcpool_test.go | 39 ++++++++++++++----- util/gpool/spmc/worker.go | 2 +- 6 files changed, 56 insertions(+), 20 deletions(-) diff --git a/resourcemanager/pooltask/task_manager.go b/resourcemanager/pooltask/task_manager.go index 66d6451b163ba..e87c222569792 100644 --- a/resourcemanager/pooltask/task_manager.go +++ b/resourcemanager/pooltask/task_manager.go @@ -65,7 +65,7 @@ type TaskManager[T any, U any, C any, CT any, TF Context[CT]] struct { concurrency int32 } -// NewTaskManager create a new pooltask manager. +// NewTaskManager create a new pool task manager. func NewTaskManager[T any, U any, C any, CT any, TF Context[CT]](c int32) TaskManager[T, U, C, CT, TF] { task := make([]TaskStatusContainer[T, U, C, CT, TF], shard) for i := 0; i < shard; i++ { @@ -148,3 +148,8 @@ func (t *TaskManager[T, U, C, CT, TF]) StopTask(taskID uint64) { } } } + +// GetOriginConcurrency return the concurrency of the pool at the init. +func (t *TaskManager[T, U, C, CT, TF]) GetOriginConcurrency() int32 { + return t.concurrency +} diff --git a/resourcemanager/pooltask/task_manager_scheduler.go b/resourcemanager/pooltask/task_manager_scheduler.go index dcc158df06d82..73c5ee46f099a 100644 --- a/resourcemanager/pooltask/task_manager_scheduler.go +++ b/resourcemanager/pooltask/task_manager_scheduler.go @@ -15,14 +15,17 @@ package pooltask // Overclock is to increase the concurrency of pool. -func (t *TaskManager[T, U, C, CT, TF]) Overclock() (tid uint64, task *TaskBox[T, U, C, CT, TF]) { - if t.concurrency > t.running.Load() { - return t.getBoostTask() +func (t *TaskManager[T, U, C, CT, TF]) Overclock(capacity int) (tid uint64, task *TaskBox[T, U, C, CT, TF]) { + if t.running.Load() >= int32(capacity) { + return } - return 0, nil + return t.getBoostTask() } // Downclock is to decrease the concurrency of pool. -func (t *TaskManager[T, U, C, CT, TF]) Downclock() { +func (t *TaskManager[T, U, C, CT, TF]) Downclock(capacity int) { + if t.running.Load() <= int32(capacity) { + return + } t.pauseTask() } diff --git a/resourcemanager/util/util.go b/resourcemanager/util/util.go index 6d1959bd08904..92b7ddfe8f979 100644 --- a/resourcemanager/util/util.go +++ b/resourcemanager/util/util.go @@ -23,6 +23,9 @@ import ( var ( // MinSchedulerInterval is the minimum interval between two scheduling. MinSchedulerInterval = atomic.NewDuration(200 * time.Millisecond) + + // MaxOverclockCount is the maximum number of overclock goroutine. + MaxOverclockCount = 1 ) // GoroutinePool is a pool interface diff --git a/util/gpool/spmc/spmcpool.go b/util/gpool/spmc/spmcpool.go index 5f58bba12d5b4..6f65ca98aba01 100644 --- a/util/gpool/spmc/spmcpool.go +++ b/util/gpool/spmc/spmcpool.go @@ -137,14 +137,18 @@ func (p *Pool[T, U, C, CT, TF]) Tune(size int) { if capacity == -1 || size <= 0 || size == capacity { return } + if p.taskManager.GetOriginConcurrency()+int32(util.MaxOverclockCount) < int32(size) { + return + } p.SetLastTuneTs(time.Now()) p.capacity.Store(int32(size)) if size > capacity { for i := 0; i < size-capacity; i++ { - if tid, boostTask := p.taskManager.Overclock(); boostTask != nil { + if tid, boostTask := p.taskManager.Overclock(size); boostTask != nil { p.addWaitingTask() - p.taskManager.AddSubTask(tid, boostTask.Clone()) - p.taskCh <- boostTask + newTask := boostTask.Clone() + p.taskManager.AddSubTask(tid, newTask) + p.taskCh <- newTask } } if size-capacity == 1 { @@ -155,7 +159,7 @@ func (p *Pool[T, U, C, CT, TF]) Tune(size int) { return } if size < capacity { - p.taskManager.Downclock() + p.taskManager.Downclock(size) } } diff --git a/util/gpool/spmc/spmcpool_test.go b/util/gpool/spmc/spmcpool_test.go index 5bc5da4fdf3bc..83d02d2d47ac2 100644 --- a/util/gpool/spmc/spmcpool_test.go +++ b/util/gpool/spmc/spmcpool_test.go @@ -175,17 +175,22 @@ func testTunePool(t *testing.T, name string) { } }() time.Sleep(1 * time.Second) - newSize := pool.Cap() - 1 - pool.Tune(newSize) - time.Sleep(1 * time.Second) - require.Equal(t, newSize, pool.Cap()) - require.Equal(t, int32(newSize), pool.taskManager.Running(tid)) + downclockPool(t, pool, tid) + overclockPool(t, pool, tid) - newSize = pool.Cap() + 1 - pool.Tune(newSize) + // at Overclock mode + overclockPool(t, pool, tid) + + // Overclock mode, But it is invalid. It should keep the same size. + size := pool.Cap() + pool.Tune(pool.Cap() + 1) time.Sleep(1 * time.Second) - require.Equal(t, newSize, pool.Cap()) - require.Equal(t, int32(newSize), pool.taskManager.Running(tid)) + require.Equal(t, size, pool.Cap()) + require.Equal(t, int32(size), pool.taskManager.Running(tid)) + + for n := pool.Cap(); n > 1; n-- { + downclockPool(t, pool, tid) + } // exit test close(exit) @@ -195,6 +200,22 @@ func testTunePool(t *testing.T, name string) { pool.ReleaseAndWait() } +func overclockPool[T any, U any, C any, CT any, TF pooltask.Context[CT]](t *testing.T, pool *Pool[T, U, C, CT, TF], tid uint64) { + newSize := pool.Cap() + 1 + pool.Tune(newSize) + time.Sleep(1 * time.Second) + require.Equal(t, newSize, pool.Cap()) + require.Equal(t, int32(newSize), pool.taskManager.Running(tid)) +} + +func downclockPool[T any, U any, C any, CT any, TF pooltask.Context[CT]](t *testing.T, pool *Pool[T, U, C, CT, TF], tid uint64) { + newSize := pool.Cap() - 1 + pool.Tune(newSize) + time.Sleep(1 * time.Second) + require.Equal(t, newSize, pool.Cap()) + require.Equal(t, int32(newSize), pool.taskManager.Running(tid)) +} + func TestPoolWithEnoughCapacity(t *testing.T) { const ( RunTimes = 1000 diff --git a/util/gpool/spmc/worker.go b/util/gpool/spmc/worker.go index 158c677775987..6076aacc317ed 100644 --- a/util/gpool/spmc/worker.go +++ b/util/gpool/spmc/worker.go @@ -72,8 +72,8 @@ func (w *goWorker[T, U, C, CT, TF]) run() { f.GetResultCh() <- w.pool.consumerFunc(t.Task, f.ConstArgs(), ctx) f.Done() } - w.pool.ExitSubTask(f.TaskID()) } + w.pool.ExitSubTask(f.TaskID()) f.Finish() if ok := w.pool.revertWorker(w); !ok { return