Skip to content

Commit

Permalink
Revert "Using atomic instead of mutex and delete scratch slice (valya…
Browse files Browse the repository at this point in the history
…la#1833)"

This reverts commit 19c50cd.
  • Loading branch information
newacorn committed Sep 7, 2024
1 parent df22176 commit 8b0df66
Showing 1 changed file with 91 additions and 74 deletions.
165 changes: 91 additions & 74 deletions workerpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"runtime"
"strings"
"sync"
"sync/atomic"
"time"
)

Expand All @@ -22,57 +21,29 @@ type workerPool struct {

// Function for serving server connections.
// It must leave c unclosed.
ready workerChanStack
WorkerFunc ServeHandler

stopCh chan struct{}

connState func(net.Conn, ConnState)

ready []*workerChan

MaxWorkersCount int

MaxIdleWorkerDuration time.Duration

workersCount int32
workersCount int

mustStop atomic.Bool
lock sync.Mutex

LogAllErrors bool
mustStop bool
}

type workerChan struct {
next *workerChan

ch chan net.Conn

lastUseTime int64
}

type workerChanStack struct {
head atomic.Pointer[workerChan]
}

func (s *workerChanStack) push(ch *workerChan) {
for {
oldHead := s.head.Load()
ch.next = oldHead
if s.head.CompareAndSwap(oldHead, ch) {
break
}
}
}

func (s *workerChanStack) pop() *workerChan {
for {
oldHead := s.head.Load()
if oldHead == nil {
return nil
}

if s.head.CompareAndSwap(oldHead, oldHead.next) {
return oldHead
}
}
lastUseTime time.Time
ch chan net.Conn
}

func (wp *workerPool) Start() {
Expand All @@ -87,8 +58,9 @@ func (wp *workerPool) Start() {
}
}
go func() {
var scratch []*workerChan
for {
wp.clean()
wp.clean(&scratch)
select {
case <-stopCh:
return
Expand All @@ -109,15 +81,15 @@ func (wp *workerPool) Stop() {
// Stop all the workers waiting for incoming connections.
// Do not wait for busy workers - they will stop after
// serving the connection and noticing wp.mustStop = true.

for {
ch := wp.ready.pop()
if ch == nil {
break
}
ch.ch <- nil
wp.lock.Lock()
ready := wp.ready
for i := range ready {
ready[i].ch <- nil
ready[i] = nil
}
wp.mustStop.Store(true)
wp.ready = ready[:0]
wp.mustStop = true
wp.lock.Unlock()
}

func (wp *workerPool) getMaxIdleWorkerDuration() time.Duration {
Expand All @@ -127,22 +99,50 @@ func (wp *workerPool) getMaxIdleWorkerDuration() time.Duration {
return wp.MaxIdleWorkerDuration
}

func (wp *workerPool) clean() {
func (wp *workerPool) clean(scratch *[]*workerChan) {
maxIdleWorkerDuration := wp.getMaxIdleWorkerDuration()
criticalTime := time.Now().Add(-maxIdleWorkerDuration).UnixNano()

for {
current := wp.ready.head.Load()
if current == nil || atomic.LoadInt64(&current.lastUseTime) >= criticalTime {
break
}
// Clean least recently used workers if they didn't serve connections
// for more than maxIdleWorkerDuration.
criticalTime := time.Now().Add(-maxIdleWorkerDuration)

wp.lock.Lock()
ready := wp.ready
n := len(ready)

next := current.next
if wp.ready.head.CompareAndSwap(current, next) {
current.ch <- nil
wp.workerChanPool.Put(current)
// Use binary-search algorithm to find out the index of the least recently worker which can be cleaned up.
l, r := 0, n-1
for l <= r {
mid := (l + r) / 2
if criticalTime.After(wp.ready[mid].lastUseTime) {
l = mid + 1
} else {
r = mid - 1
}
}
i := r
if i == -1 {
wp.lock.Unlock()
return
}

*scratch = append((*scratch)[:0], ready[:i+1]...)
m := copy(ready, ready[i+1:])
for i = m; i < n; i++ {
ready[i] = nil
}
wp.ready = ready[:m]
wp.lock.Unlock()

// Notify obsolete workers to stop.
// This notification must be outside the wp.lock, since ch.ch
// may be blocking and may consume a lot of time if many workers
// are located on non-local CPUs.
tmp := *scratch
for i := range tmp {
tmp[i].ch <- nil
tmp[i] = nil
}
}

func (wp *workerPool) Serve(c net.Conn) bool {
Expand All @@ -169,32 +169,47 @@ var workerChanCap = func() int {
}()

func (wp *workerPool) getCh() *workerChan {
for {
ch := wp.ready.pop()
if ch != nil {
return ch
var ch *workerChan
createWorker := false

wp.lock.Lock()
ready := wp.ready
n := len(ready) - 1
if n < 0 {
if wp.workersCount < wp.MaxWorkersCount {
createWorker = true
wp.workersCount++
}
} else {
ch = ready[n]
ready[n] = nil
wp.ready = ready[:n]
}
wp.lock.Unlock()

currentWorkers := atomic.LoadInt32(&wp.workersCount)
if currentWorkers < int32(wp.MaxWorkersCount) {
if atomic.CompareAndSwapInt32(&wp.workersCount, currentWorkers, currentWorkers+1) {
ch = wp.workerChanPool.Get().(*workerChan)
go wp.workerFunc(ch)
return ch
}
} else {
break
if ch == nil {
if !createWorker {
return nil
}
vch := wp.workerChanPool.Get()
ch = vch.(*workerChan)
go func() {
wp.workerFunc(ch)
wp.workerChanPool.Put(vch)
}()
}
return nil
return ch
}

func (wp *workerPool) release(ch *workerChan) bool {
atomic.StoreInt64(&ch.lastUseTime, time.Now().UnixNano())
if wp.mustStop.Load() {
ch.lastUseTime = time.Now()
wp.lock.Lock()
if wp.mustStop {
wp.lock.Unlock()
return false
}
wp.ready.push(ch)
wp.ready = append(wp.ready, ch)
wp.lock.Unlock()
return true
}

Expand Down Expand Up @@ -230,5 +245,7 @@ func (wp *workerPool) workerFunc(ch *workerChan) {
}
}

atomic.AddInt32(&wp.workersCount, -1)
wp.lock.Lock()
wp.workersCount--
wp.lock.Unlock()
}

0 comments on commit 8b0df66

Please sign in to comment.