From 6f030f41bca54c8c02227f399cc7d848f5495bae Mon Sep 17 00:00:00 2001 From: Victor Agababov Date: Fri, 8 Feb 2019 19:17:41 -0800 Subject: [PATCH] Semaphore is the internal implementation detail of the breaker. (#3143) Hence there is no reason for it to be public. --- pkg/queue/breaker.go | 20 ++++++++++---------- pkg/queue/breaker_test.go | 2 +- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/pkg/queue/breaker.go b/pkg/queue/breaker.go index 83f71feb1dfd..8020021e4955 100644 --- a/pkg/queue/breaker.go +++ b/pkg/queue/breaker.go @@ -47,7 +47,7 @@ type token struct{} // beyond the limit of the queue are failed immediately. type Breaker struct { pendingRequests chan token - sem *Semaphore + sem *semaphore logger *zap.SugaredLogger } @@ -113,23 +113,23 @@ func (b *Breaker) Capacity() int32 { // Maximal capacity is the size of the buffered channel, it defines maximum number of tokens // in the rotation. Attempting to add more capacity then the max will result in error. // Initial capacity is the initial number of free tokens. -func NewSemaphore(maxCapacity, initialCapacity int32) *Semaphore { +func NewSemaphore(maxCapacity, initialCapacity int32) *semaphore { if initialCapacity < 0 || initialCapacity > maxCapacity { panic(fmt.Sprintf("Initial capacity must be between 0 and maximal capacity. Got %v.", initialCapacity)) } queue := make(chan token, maxCapacity) - sem := Semaphore{queue: queue} + sem := semaphore{queue: queue} if initialCapacity > 0 { sem.UpdateCapacity(initialCapacity) } return &sem } -// Semaphore is an implementation of a semaphore based on Go channels. +// semaphore is an implementation of a semaphore based on Go channels. // The presence of elements in the `queue` buffered channel correspond to available tokens. // Hence the max number of tokens to hand out equals to the size of the channel. // `capacity` defines the current number of tokens in the rotation. -type Semaphore struct { +type semaphore struct { queue chan token token token reducers int32 @@ -138,14 +138,14 @@ type Semaphore struct { } // Acquire receives the token from the semaphore, potentially blocking. -func (s *Semaphore) Acquire() { +func (s *semaphore) Acquire() { <-s.queue } // Release potentially puts the token back to the queue. // If the semaphore capacity was reduced in between and is not yet reflected, // we remove the tokens from the rotation instead of returning them back. -func (s *Semaphore) Release() error { +func (s *semaphore) Release() error { s.mux.Lock() defer s.mux.Unlock() @@ -167,7 +167,7 @@ func (s *Semaphore) Release() error { // UpdateCapacity updates the capacity of the semaphore to the desired // size. -func (s *Semaphore) UpdateCapacity(size int32) error { +func (s *semaphore) UpdateCapacity(size int32) error { if size < 0 || size > int32(cap(s.queue)) { return ErrUpdateCapacity } @@ -214,13 +214,13 @@ func (s *Semaphore) UpdateCapacity(size int32) error { // effectiveCapacity is the capacity with reducers taken into account. // `mux` must be held to call it. -func (s *Semaphore) effectiveCapacity() int32 { +func (s *semaphore) effectiveCapacity() int32 { return s.capacity - s.reducers } // Capacity is the effective capacity after taking reducers into // account. -func (s *Semaphore) Capacity() int32 { +func (s *semaphore) Capacity() int32 { s.mux.Lock() defer s.mux.Unlock() diff --git a/pkg/queue/breaker_test.go b/pkg/queue/breaker_test.go index 744395a08019..f993f08ea824 100644 --- a/pkg/queue/breaker_test.go +++ b/pkg/queue/breaker_test.go @@ -374,7 +374,7 @@ func assertEqual(want, got interface{}, t *testing.T) { } } -func tryAcquire(sem *Semaphore, gotChan chan struct{}) { +func tryAcquire(sem *semaphore, gotChan chan struct{}) { go func() { // blocking until someone puts the token into the semaphore sem.Acquire()