Skip to content

Commit

Permalink
Simplified workers wait group in memcachedClient
Browse files Browse the repository at this point in the history
Signed-off-by: Marco Pracucci <[email protected]>
  • Loading branch information
pracucci committed Dec 13, 2019
1 parent 2e87462 commit 38a9a41
Showing 1 changed file with 10 additions and 10 deletions.
20 changes: 10 additions & 10 deletions pkg/cacheutil/memcached_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,12 +136,13 @@ type memcachedClient struct {
stop chan struct{}

// Channel used to enqueue async operations.
asyncQueue chan func()
asyncWorkers sync.WaitGroup
asyncQueue chan func()

// Channel used to enqueue get multi operations.
getMultiQueue chan *memcachedGetMultiBatch
getMultiWorkers sync.WaitGroup
getMultiQueue chan *memcachedGetMultiBatch

// Wait group used to wait all workers on stopping.
workers sync.WaitGroup
}

type memcachedGetMultiBatch struct {
Expand Down Expand Up @@ -195,14 +196,14 @@ func newMemcachedClient(logger log.Logger, client memcachedClientBackend, select

// Start a number of goroutines - processing async operations - equal
// to the max concurrency we have.
c.asyncWorkers.Add(c.config.MaxAsyncConcurrency)
c.workers.Add(c.config.MaxAsyncConcurrency)
for i := 0; i < c.config.MaxAsyncConcurrency; i++ {
go c.asyncQueueProcessLoop()
}

// Start a number of goroutines - processing get multi batch operations - equal
// to the max concurrency we have.
c.getMultiWorkers.Add(c.config.MaxGetMultiBatchConcurrency)
c.workers.Add(c.config.MaxGetMultiBatchConcurrency)
for i := 0; i < c.config.MaxGetMultiBatchConcurrency; i++ {
go c.getMultiQueueProcessLoop()
}
Expand All @@ -214,8 +215,7 @@ func (c *memcachedClient) Stop() {
close(c.stop)

// Wait until all workers have terminated.
c.asyncWorkers.Wait()
c.getMultiWorkers.Wait()
c.workers.Wait()
}

func (c *memcachedClient) SetAsync(key string, value []byte, ttl time.Duration) error {
Expand Down Expand Up @@ -320,7 +320,7 @@ func (c *memcachedClient) enqueueAsync(op func()) error {
}

func (c *memcachedClient) asyncQueueProcessLoop() {
defer c.asyncWorkers.Done()
defer c.workers.Done()

for {
select {
Expand All @@ -333,7 +333,7 @@ func (c *memcachedClient) asyncQueueProcessLoop() {
}

func (c *memcachedClient) getMultiQueueProcessLoop() {
defer c.getMultiWorkers.Done()
defer c.workers.Done()

for {
select {
Expand Down

0 comments on commit 38a9a41

Please sign in to comment.