From ae01f49b2220b33fc24e9e46f4679ebf0666087f Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Mon, 13 Mar 2023 10:41:32 +0800 Subject: [PATCH 1/9] spool: fix data race when to exit Signed-off-by: Weizhen Wang --- resourcemanager/pool/spool/BUILD.bazel | 1 + resourcemanager/pool/spool/spool.go | 9 ++++++++- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/resourcemanager/pool/spool/BUILD.bazel b/resourcemanager/pool/spool/BUILD.bazel index dc5fb22629405..fade4ae589e5a 100644 --- a/resourcemanager/pool/spool/BUILD.bazel +++ b/resourcemanager/pool/spool/BUILD.bazel @@ -30,6 +30,7 @@ go_test( "spool_test.go", ], embed = [":spool"], + race = "on", shard_count = 2, deps = [ "//resourcemanager/pool", diff --git a/resourcemanager/pool/spool/spool.go b/resourcemanager/pool/spool/spool.go index a3a3de16a08c9..e40e81c242a68 100644 --- a/resourcemanager/pool/spool/spool.go +++ b/resourcemanager/pool/spool/spool.go @@ -31,6 +31,8 @@ import ( "go.uber.org/zap" ) +const waitInterval = 5 * time.Millisecond + // Pool is a single goroutine pool. it can not reuse the goroutine. type Pool struct { wg sync.WaitGroup @@ -154,7 +156,10 @@ func (p *Pool) checkAndAddRunning(concurrency uint32) (conc int32, run bool) { return 0, false } p.mu.Unlock() - time.Sleep(5 * time.Millisecond) + if p.isStop.Load() { + return 0, false + } + time.Sleep(waitInterval) } } @@ -173,6 +178,8 @@ func (p *Pool) checkAndAddRunningInternal(concurrency int32) (conc int32, run bo // ReleaseAndWait releases the pool and waits for all tasks to be completed. func (p *Pool) ReleaseAndWait() { p.isStop.Store(true) + // wait for all the task in the pending to exit + time.Sleep(2 * waitInterval) p.wg.Wait() resourcemanager.InstanceResourceManager.Unregister(p.Name()) } From abb761170b33c74ecfc7d54b6ada4d73d9ec23b9 Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Mon, 13 Mar 2023 11:23:57 +0800 Subject: [PATCH 2/9] update --- resourcemanager/pool/spool/spool.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/resourcemanager/pool/spool/spool.go b/resourcemanager/pool/spool/spool.go index e40e81c242a68..5121723a004dd 100644 --- a/resourcemanager/pool/spool/spool.go +++ b/resourcemanager/pool/spool/spool.go @@ -145,6 +145,9 @@ func (p *Pool) RunWithConcurrency(fns chan func(), concurrency uint32) error { // checkAndAddRunning is to check if a task can run. If can, add the running number. func (p *Pool) checkAndAddRunning(concurrency uint32) (conc int32, run bool) { for { + if p.isStop.Load() { + return 0, false + } p.mu.Lock() value, run := p.checkAndAddRunningInternal(int32(concurrency)) if run { @@ -156,9 +159,6 @@ func (p *Pool) checkAndAddRunning(concurrency uint32) (conc int32, run bool) { return 0, false } p.mu.Unlock() - if p.isStop.Load() { - return 0, false - } time.Sleep(waitInterval) } } From c3e149f7a8ba4de4a73ad112ab52a13ed51e324a Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Mon, 13 Mar 2023 14:09:58 +0800 Subject: [PATCH 3/9] update Signed-off-by: Weizhen Wang --- resourcemanager/pool/spool/spool.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/resourcemanager/pool/spool/spool.go b/resourcemanager/pool/spool/spool.go index 5121723a004dd..44220767e1b3c 100644 --- a/resourcemanager/pool/spool/spool.go +++ b/resourcemanager/pool/spool/spool.go @@ -40,6 +40,7 @@ type Pool struct { options *Options capacity int32 running atomic.Int32 + waiting atomic.Int32 isStop atomic.Bool concurrencyMetrics prometheus.Gauge taskManager pooltask.TaskManager[any, any, any, any, pooltask.NilContext] @@ -144,6 +145,8 @@ func (p *Pool) RunWithConcurrency(fns chan func(), concurrency uint32) error { // checkAndAddRunning is to check if a task can run. If can, add the running number. func (p *Pool) checkAndAddRunning(concurrency uint32) (conc int32, run bool) { + p.waiting.Add(1) + defer p.waiting.Add(-1) for { if p.isStop.Load() { return 0, false @@ -179,7 +182,9 @@ func (p *Pool) checkAndAddRunningInternal(concurrency int32) (conc int32, run bo func (p *Pool) ReleaseAndWait() { p.isStop.Store(true) // wait for all the task in the pending to exit - time.Sleep(2 * waitInterval) + for p.waiting.Load() > 0 { + time.Sleep(waitInterval) + } p.wg.Wait() resourcemanager.InstanceResourceManager.Unregister(p.Name()) } From 3bcc1967b0198bd0ddea092ac60047410eea14ce Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Mon, 13 Mar 2023 14:58:47 +0800 Subject: [PATCH 4/9] update Signed-off-by: Weizhen Wang --- resourcemanager/pool/spool/spool.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/resourcemanager/pool/spool/spool.go b/resourcemanager/pool/spool/spool.go index 44220767e1b3c..5a300586eca14 100644 --- a/resourcemanager/pool/spool/spool.go +++ b/resourcemanager/pool/spool/spool.go @@ -82,6 +82,8 @@ func (p *Pool) Tune(size int32) { // Run runs a function in the pool. func (p *Pool) Run(fn func()) error { + p.waiting.Add(1) + defer p.waiting.Add(-1) if p.isStop.Load() { return pool.ErrPoolClosed } @@ -124,6 +126,8 @@ func (p *Pool) run(fn func()) { // RunWithConcurrency runs a function in the pool with concurrency. func (p *Pool) RunWithConcurrency(fns chan func(), concurrency uint32) error { + p.waiting.Add(1) + defer p.waiting.Add(-1) if p.isStop.Load() { return pool.ErrPoolClosed } @@ -145,8 +149,6 @@ func (p *Pool) RunWithConcurrency(fns chan func(), concurrency uint32) error { // checkAndAddRunning is to check if a task can run. If can, add the running number. func (p *Pool) checkAndAddRunning(concurrency uint32) (conc int32, run bool) { - p.waiting.Add(1) - defer p.waiting.Add(-1) for { if p.isStop.Load() { return 0, false @@ -183,7 +185,6 @@ func (p *Pool) ReleaseAndWait() { p.isStop.Store(true) // wait for all the task in the pending to exit for p.waiting.Load() > 0 { - time.Sleep(waitInterval) } p.wg.Wait() resourcemanager.InstanceResourceManager.Unregister(p.Name()) From 0183d5c06c844eea3f1ac700a5143165374267c9 Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Mon, 13 Mar 2023 15:08:55 +0800 Subject: [PATCH 5/9] update Signed-off-by: Weizhen Wang --- resourcemanager/pool/spool/spool.go | 1 + 1 file changed, 1 insertion(+) diff --git a/resourcemanager/pool/spool/spool.go b/resourcemanager/pool/spool/spool.go index 5a300586eca14..243c519ae5df4 100644 --- a/resourcemanager/pool/spool/spool.go +++ b/resourcemanager/pool/spool/spool.go @@ -185,6 +185,7 @@ func (p *Pool) ReleaseAndWait() { p.isStop.Store(true) // wait for all the task in the pending to exit for p.waiting.Load() > 0 { + _ = "" // it is empty statement to skip empty-block linter. } p.wg.Wait() resourcemanager.InstanceResourceManager.Unregister(p.Name()) From 0f0a8af81dc75ef5e9e93656ab6a2d5a9296a8dc Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Mon, 13 Mar 2023 15:17:50 +0800 Subject: [PATCH 6/9] update Signed-off-by: Weizhen Wang --- resourcemanager/pool/spool/BUILD.bazel | 1 + 1 file changed, 1 insertion(+) diff --git a/resourcemanager/pool/spool/BUILD.bazel b/resourcemanager/pool/spool/BUILD.bazel index fade4ae589e5a..c8f6788d824f5 100644 --- a/resourcemanager/pool/spool/BUILD.bazel +++ b/resourcemanager/pool/spool/BUILD.bazel @@ -30,6 +30,7 @@ go_test( "spool_test.go", ], embed = [":spool"], + flaky = True, race = "on", shard_count = 2, deps = [ From 265294daacdf9787b46c16f6a91fcabd569df876 Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Mon, 13 Mar 2023 15:35:12 +0800 Subject: [PATCH 7/9] update Signed-off-by: Weizhen Wang --- resourcemanager/pool/spool/spool.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/resourcemanager/pool/spool/spool.go b/resourcemanager/pool/spool/spool.go index 243c519ae5df4..13c7200b407c2 100644 --- a/resourcemanager/pool/spool/spool.go +++ b/resourcemanager/pool/spool/spool.go @@ -42,6 +42,7 @@ type Pool struct { running atomic.Int32 waiting atomic.Int32 isStop atomic.Bool + cond sync.Cond concurrencyMetrics prometheus.Gauge taskManager pooltask.TaskManager[any, any, any, any, pooltask.NilContext] pool.BasePool @@ -55,6 +56,7 @@ func NewPool(name string, size int32, component util.Component, options ...Optio concurrencyMetrics: metrics.PoolConcurrencyCounter.WithLabelValues(name), taskManager: pooltask.NewTaskManager[any, any, any, any, pooltask.NilContext](size), // TODO: this general type } + result.cond = *sync.NewCond(&result.mu) if size == 0 { return nil, pool.ErrPoolParamsInvalid } @@ -83,6 +85,7 @@ func (p *Pool) Tune(size int32) { // Run runs a function in the pool. func (p *Pool) Run(fn func()) error { p.waiting.Add(1) + defer p.cond.Signal() defer p.waiting.Add(-1) if p.isStop.Load() { return pool.ErrPoolClosed @@ -127,6 +130,7 @@ func (p *Pool) run(fn func()) { // RunWithConcurrency runs a function in the pool with concurrency. func (p *Pool) RunWithConcurrency(fns chan func(), concurrency uint32) error { p.waiting.Add(1) + defer p.cond.Signal() defer p.waiting.Add(-1) if p.isStop.Load() { return pool.ErrPoolClosed @@ -185,7 +189,7 @@ func (p *Pool) ReleaseAndWait() { p.isStop.Store(true) // wait for all the task in the pending to exit for p.waiting.Load() > 0 { - _ = "" // it is empty statement to skip empty-block linter. + p.cond.Wait() } p.wg.Wait() resourcemanager.InstanceResourceManager.Unregister(p.Name()) From 4fbb9ec8c5f73f21fdbfb22708cd42fb0359de46 Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Mon, 13 Mar 2023 15:46:36 +0800 Subject: [PATCH 8/9] update Signed-off-by: Weizhen Wang --- resourcemanager/pool/spool/spool.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/resourcemanager/pool/spool/spool.go b/resourcemanager/pool/spool/spool.go index 13c7200b407c2..c48e95e6fcc29 100644 --- a/resourcemanager/pool/spool/spool.go +++ b/resourcemanager/pool/spool/spool.go @@ -42,6 +42,7 @@ type Pool struct { running atomic.Int32 waiting atomic.Int32 isStop atomic.Bool + condMu sync.Mutex cond sync.Cond concurrencyMetrics prometheus.Gauge taskManager pooltask.TaskManager[any, any, any, any, pooltask.NilContext] @@ -56,7 +57,7 @@ func NewPool(name string, size int32, component util.Component, options ...Optio concurrencyMetrics: metrics.PoolConcurrencyCounter.WithLabelValues(name), taskManager: pooltask.NewTaskManager[any, any, any, any, pooltask.NilContext](size), // TODO: this general type } - result.cond = *sync.NewCond(&result.mu) + result.cond = *sync.NewCond(&result.condMu) if size == 0 { return nil, pool.ErrPoolParamsInvalid } From 6277e79a37c66a718a2b677447eb1413b3555679 Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Mon, 13 Mar 2023 16:05:00 +0800 Subject: [PATCH 9/9] update Signed-off-by: Weizhen Wang --- resourcemanager/pool/spool/spool.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/resourcemanager/pool/spool/spool.go b/resourcemanager/pool/spool/spool.go index c48e95e6fcc29..d3f2fbefcf890 100644 --- a/resourcemanager/pool/spool/spool.go +++ b/resourcemanager/pool/spool/spool.go @@ -189,9 +189,11 @@ func (p *Pool) checkAndAddRunningInternal(concurrency int32) (conc int32, run bo func (p *Pool) ReleaseAndWait() { p.isStop.Store(true) // wait for all the task in the pending to exit + p.cond.L.Lock() for p.waiting.Load() > 0 { p.cond.Wait() } + p.cond.L.Unlock() p.wg.Wait() resourcemanager.InstanceResourceManager.Unregister(p.Name()) }