Skip to content

Commit

Permalink
[FAB-13347] introduce counting semaphore
Browse files Browse the repository at this point in the history
The counting semaphore will be used as part of throttling
implementation.

Change-Id: Ie8741414a6b25373072a611a55620519051489cc
Signed-off-by: Matthew Sykes <[email protected]>
(cherry picked from commit f975549)
  • Loading branch information
sykesm authored and mastersingh24 committed Jan 9, 2019
1 parent b43fcc7 commit 4fd7013
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 0 deletions.
35 changes: 35 additions & 0 deletions common/semaphore/semaphore.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/

package semaphore

import "context"

type Semaphore chan struct{}

func New(count int) Semaphore {
if count <= 0 {
panic("count must be greater than 0")
}
return make(chan struct{}, count)
}

func (s Semaphore) Acquire(ctx context.Context) error {
select {
case <-ctx.Done():
return ctx.Err()
case s <- struct{}{}:
return nil
}
}

func (s Semaphore) Release() {
select {
case <-s:
default:
panic("semaphore buffer is empty")
}
}
63 changes: 63 additions & 0 deletions common/semaphore/semaphore_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/

package semaphore_test

import (
"context"
"testing"

"github.com/hyperledger/fabric/common/semaphore"
. "github.com/onsi/gomega"
"github.com/stretchr/testify/assert"
)

func TestNewSemaphorePanic(t *testing.T) {
assert.PanicsWithValue(t, "count must be greater than 0", func() { semaphore.New(0) })
}

func TestSemaphoreBlocking(t *testing.T) {
gt := NewGomegaWithT(t)

sema := semaphore.New(5)
for i := 0; i < 5; i++ {
err := sema.Acquire(context.Background())
gt.Expect(err).NotTo(HaveOccurred())
}

done := make(chan struct{})
go func() {
err := sema.Acquire(context.Background())
gt.Expect(err).NotTo(HaveOccurred())

close(done)
sema.Release()
}()

gt.Consistently(done).ShouldNot(BeClosed())
sema.Release()
gt.Eventually(done).Should(BeClosed())
}

func TestSemaphoreContextError(t *testing.T) {
gt := NewGomegaWithT(t)

sema := semaphore.New(1)
err := sema.Acquire(context.Background())
gt.Expect(err).NotTo(HaveOccurred())

ctx, cancel := context.WithCancel(context.Background())
cancel()
errCh := make(chan error, 1)
go func() { errCh <- sema.Acquire(ctx) }()

gt.Eventually(errCh).Should(Receive(Equal(context.Canceled)))
}

func TestSemaphoreReleaseTooMany(t *testing.T) {
sema := semaphore.New(1)
assert.PanicsWithValue(t, "semaphore buffer is empty", func() { sema.Release() })
}

0 comments on commit 4fd7013

Please sign in to comment.