diff --git a/src/x/sync/pooled_worker_pool.go b/src/x/sync/pooled_worker_pool.go index 1cba76062f..d65344d237 100644 --- a/src/x/sync/pooled_worker_pool.go +++ b/src/x/sync/pooled_worker_pool.go @@ -36,13 +36,15 @@ const ( type pooledWorkerPool struct { sync.Mutex - numRoutinesAtomic int64 - numRoutinesGauge tally.Gauge - growOnDemand bool - workChs []chan Work - numShards int64 - killWorkerProbability float64 - nowFn NowFn + numRoutinesAtomic int64 + numWorkingRoutinesAtomic int64 + numRoutinesGauge tally.Gauge + numWorkingRoutinesGauge tally.Gauge + growOnDemand bool + workChs []chan Work + numShards int64 + killWorkerProbability float64 + nowFn NowFn } // NewPooledWorkerPool creates a new worker pool. @@ -62,13 +64,15 @@ func NewPooledWorkerPool(size int, opts PooledWorkerPoolOptions) (PooledWorkerPo } return &pooledWorkerPool{ - numRoutinesAtomic: 0, - numRoutinesGauge: opts.InstrumentOptions().MetricsScope().Gauge("num-routines"), - growOnDemand: opts.GrowOnDemand(), - workChs: workChs, - numShards: numShards, - killWorkerProbability: opts.KillWorkerProbability(), - nowFn: opts.NowFn(), + numRoutinesAtomic: 0, + numWorkingRoutinesAtomic: 0, + numRoutinesGauge: opts.InstrumentOptions().MetricsScope().Gauge("num-routines"), + numWorkingRoutinesGauge: opts.InstrumentOptions().MetricsScope().Gauge("num-working-routines"), + growOnDemand: opts.GrowOnDemand(), + workChs: workChs, + numShards: numShards, + killWorkerProbability: opts.KillWorkerProbability(), + nowFn: opts.NowFn(), }, nil } @@ -91,6 +95,7 @@ func (p *pooledWorkerPool) Go(work Work) { if currTime%numGoroutinesGaugeSampleRate == 0 { p.emitNumRoutines() + p.emitNumWorkingRoutines() } if !p.growOnDemand { @@ -138,7 +143,9 @@ func (p *pooledWorkerPool) spawnWorker( killThreshold = uint64(p.killWorkerProbability * float64(math.MaxUint64)) ) for f := range workCh { + p.incNumWorkingRoutines() f() + p.decNumWorkingRoutines() if rng.Random() < killThreshold { if spawnReplacement { p.spawnWorker(rng.Random(), nil, workCh, true) @@ -166,3 +173,20 @@ func (p *pooledWorkerPool) decNumRoutines() { func (p *pooledWorkerPool) getNumRoutines() int64 { return atomic.LoadInt64(&p.numRoutinesAtomic) } + +func (p *pooledWorkerPool) emitNumWorkingRoutines() { + numRoutines := float64(p.getNumWorkingRoutines()) + p.numWorkingRoutinesGauge.Update(numRoutines) +} + +func (p *pooledWorkerPool) incNumWorkingRoutines() { + atomic.AddInt64(&p.numWorkingRoutinesAtomic, 1) +} + +func (p *pooledWorkerPool) decNumWorkingRoutines() { + atomic.AddInt64(&p.numWorkingRoutinesAtomic, -1) +} + +func (p *pooledWorkerPool) getNumWorkingRoutines() int64 { + return atomic.LoadInt64(&p.numWorkingRoutinesAtomic) +}