Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

spool: fix data race when to exit #42129

Merged
merged 9 commits into from
Mar 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions resourcemanager/pool/spool/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ go_test(
"spool_test.go",
],
embed = [":spool"],
flaky = True,
race = "on",
shard_count = 2,
deps = [
"//resourcemanager/pool",
Expand Down
23 changes: 22 additions & 1 deletion resourcemanager/pool/spool/spool.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,19 @@ import (
"go.uber.org/zap"
)

const waitInterval = 5 * time.Millisecond

// Pool is a single goroutine pool. it can not reuse the goroutine.
type Pool struct {
wg sync.WaitGroup
mu deadlock.RWMutex
options *Options
capacity int32
running atomic.Int32
waiting atomic.Int32
isStop atomic.Bool
condMu sync.Mutex
cond sync.Cond
concurrencyMetrics prometheus.Gauge
taskManager pooltask.TaskManager[any, any, any, any, pooltask.NilContext]
pool.BasePool
Expand All @@ -52,6 +57,7 @@ func NewPool(name string, size int32, component util.Component, options ...Optio
concurrencyMetrics: metrics.PoolConcurrencyCounter.WithLabelValues(name),
taskManager: pooltask.NewTaskManager[any, any, any, any, pooltask.NilContext](size), // TODO: this general type
}
result.cond = *sync.NewCond(&result.condMu)
if size == 0 {
return nil, pool.ErrPoolParamsInvalid
}
Expand Down Expand Up @@ -79,6 +85,9 @@ func (p *Pool) Tune(size int32) {

// Run runs a function in the pool.
func (p *Pool) Run(fn func()) error {
p.waiting.Add(1)
defer p.cond.Signal()
defer p.waiting.Add(-1)
if p.isStop.Load() {
return pool.ErrPoolClosed
}
Expand Down Expand Up @@ -121,6 +130,9 @@ func (p *Pool) run(fn func()) {

// RunWithConcurrency runs a function in the pool with concurrency.
func (p *Pool) RunWithConcurrency(fns chan func(), concurrency uint32) error {
p.waiting.Add(1)
defer p.cond.Signal()
defer p.waiting.Add(-1)
if p.isStop.Load() {
return pool.ErrPoolClosed
}
Expand All @@ -143,6 +155,9 @@ func (p *Pool) RunWithConcurrency(fns chan func(), concurrency uint32) error {
// checkAndAddRunning is to check if a task can run. If can, add the running number.
func (p *Pool) checkAndAddRunning(concurrency uint32) (conc int32, run bool) {
for {
if p.isStop.Load() {
return 0, false
}
p.mu.Lock()
value, run := p.checkAndAddRunningInternal(int32(concurrency))
if run {
Expand All @@ -154,7 +169,7 @@ func (p *Pool) checkAndAddRunning(concurrency uint32) (conc int32, run bool) {
return 0, false
}
p.mu.Unlock()
time.Sleep(5 * time.Millisecond)
time.Sleep(waitInterval)
}
}

Expand All @@ -173,6 +188,12 @@ func (p *Pool) checkAndAddRunningInternal(concurrency int32) (conc int32, run bo
// ReleaseAndWait releases the pool and waits for all tasks to be completed.
func (p *Pool) ReleaseAndWait() {
p.isStop.Store(true)
// wait for all the task in the pending to exit
p.cond.L.Lock()
for p.waiting.Load() > 0 {
p.cond.Wait()
}
p.cond.L.Unlock()
p.wg.Wait()
resourcemanager.InstanceResourceManager.Unregister(p.Name())
}