Skip to content

Commit

Permalink
Semaphore is the internal implementation detail of the breaker. (#3143)
Browse files Browse the repository at this point in the history
Hence there is no reason for it to be public.
  • Loading branch information
vagababov authored and knative-prow-robot committed Feb 9, 2019
1 parent 6ff096f commit 6f030f4
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 11 deletions.
20 changes: 10 additions & 10 deletions pkg/queue/breaker.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ type token struct{}
// beyond the limit of the queue are failed immediately.
type Breaker struct {
pendingRequests chan token
sem *Semaphore
sem *semaphore
logger *zap.SugaredLogger
}

Expand Down Expand Up @@ -113,23 +113,23 @@ func (b *Breaker) Capacity() int32 {
// Maximal capacity is the size of the buffered channel, it defines maximum number of tokens
// in the rotation. Attempting to add more capacity then the max will result in error.
// Initial capacity is the initial number of free tokens.
func NewSemaphore(maxCapacity, initialCapacity int32) *Semaphore {
func NewSemaphore(maxCapacity, initialCapacity int32) *semaphore {
if initialCapacity < 0 || initialCapacity > maxCapacity {
panic(fmt.Sprintf("Initial capacity must be between 0 and maximal capacity. Got %v.", initialCapacity))
}
queue := make(chan token, maxCapacity)
sem := Semaphore{queue: queue}
sem := semaphore{queue: queue}
if initialCapacity > 0 {
sem.UpdateCapacity(initialCapacity)
}
return &sem
}

// Semaphore is an implementation of a semaphore based on Go channels.
// semaphore is an implementation of a semaphore based on Go channels.
// The presence of elements in the `queue` buffered channel correspond to available tokens.
// Hence the max number of tokens to hand out equals to the size of the channel.
// `capacity` defines the current number of tokens in the rotation.
type Semaphore struct {
type semaphore struct {
queue chan token
token token
reducers int32
Expand All @@ -138,14 +138,14 @@ type Semaphore struct {
}

// Acquire receives the token from the semaphore, potentially blocking.
func (s *Semaphore) Acquire() {
func (s *semaphore) Acquire() {
<-s.queue
}

// Release potentially puts the token back to the queue.
// If the semaphore capacity was reduced in between and is not yet reflected,
// we remove the tokens from the rotation instead of returning them back.
func (s *Semaphore) Release() error {
func (s *semaphore) Release() error {
s.mux.Lock()
defer s.mux.Unlock()

Expand All @@ -167,7 +167,7 @@ func (s *Semaphore) Release() error {

// UpdateCapacity updates the capacity of the semaphore to the desired
// size.
func (s *Semaphore) UpdateCapacity(size int32) error {
func (s *semaphore) UpdateCapacity(size int32) error {
if size < 0 || size > int32(cap(s.queue)) {
return ErrUpdateCapacity
}
Expand Down Expand Up @@ -214,13 +214,13 @@ func (s *Semaphore) UpdateCapacity(size int32) error {

// effectiveCapacity is the capacity with reducers taken into account.
// `mux` must be held to call it.
func (s *Semaphore) effectiveCapacity() int32 {
func (s *semaphore) effectiveCapacity() int32 {
return s.capacity - s.reducers
}

// Capacity is the effective capacity after taking reducers into
// account.
func (s *Semaphore) Capacity() int32 {
func (s *semaphore) Capacity() int32 {
s.mux.Lock()
defer s.mux.Unlock()

Expand Down
2 changes: 1 addition & 1 deletion pkg/queue/breaker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ func assertEqual(want, got interface{}, t *testing.T) {
}
}

func tryAcquire(sem *Semaphore, gotChan chan struct{}) {
func tryAcquire(sem *semaphore, gotChan chan struct{}) {
go func() {
// blocking until someone puts the token into the semaphore
sem.Acquire()
Expand Down

0 comments on commit 6f030f4

Please sign in to comment.