From 4fd7013ee15cf88130244f030a2f3a20f3ba031c Mon Sep 17 00:00:00 2001 From: Matthew Sykes Date: Tue, 8 Jan 2019 13:38:46 -0500 Subject: [PATCH] [FAB-13347] introduce counting semaphore The counting semaphore will be used as part of throttling implementation. Change-Id: Ie8741414a6b25373072a611a55620519051489cc Signed-off-by: Matthew Sykes (cherry picked from commit f975549503d76c276adfad52c1061901526bb410) --- common/semaphore/semaphore.go | 35 +++++++++++++++++ common/semaphore/semaphore_test.go | 63 ++++++++++++++++++++++++++++++ 2 files changed, 98 insertions(+) create mode 100644 common/semaphore/semaphore.go create mode 100644 common/semaphore/semaphore_test.go diff --git a/common/semaphore/semaphore.go b/common/semaphore/semaphore.go new file mode 100644 index 00000000000..9d83de88b8d --- /dev/null +++ b/common/semaphore/semaphore.go @@ -0,0 +1,35 @@ +/* +Copyright IBM Corp. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package semaphore + +import "context" + +type Semaphore chan struct{} + +func New(count int) Semaphore { + if count <= 0 { + panic("count must be greater than 0") + } + return make(chan struct{}, count) +} + +func (s Semaphore) Acquire(ctx context.Context) error { + select { + case <-ctx.Done(): + return ctx.Err() + case s <- struct{}{}: + return nil + } +} + +func (s Semaphore) Release() { + select { + case <-s: + default: + panic("semaphore buffer is empty") + } +} diff --git a/common/semaphore/semaphore_test.go b/common/semaphore/semaphore_test.go new file mode 100644 index 00000000000..fc9c225a9d4 --- /dev/null +++ b/common/semaphore/semaphore_test.go @@ -0,0 +1,63 @@ +/* +Copyright IBM Corp. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package semaphore_test + +import ( + "context" + "testing" + + "github.com/hyperledger/fabric/common/semaphore" + . "github.com/onsi/gomega" + "github.com/stretchr/testify/assert" +) + +func TestNewSemaphorePanic(t *testing.T) { + assert.PanicsWithValue(t, "count must be greater than 0", func() { semaphore.New(0) }) +} + +func TestSemaphoreBlocking(t *testing.T) { + gt := NewGomegaWithT(t) + + sema := semaphore.New(5) + for i := 0; i < 5; i++ { + err := sema.Acquire(context.Background()) + gt.Expect(err).NotTo(HaveOccurred()) + } + + done := make(chan struct{}) + go func() { + err := sema.Acquire(context.Background()) + gt.Expect(err).NotTo(HaveOccurred()) + + close(done) + sema.Release() + }() + + gt.Consistently(done).ShouldNot(BeClosed()) + sema.Release() + gt.Eventually(done).Should(BeClosed()) +} + +func TestSemaphoreContextError(t *testing.T) { + gt := NewGomegaWithT(t) + + sema := semaphore.New(1) + err := sema.Acquire(context.Background()) + gt.Expect(err).NotTo(HaveOccurred()) + + ctx, cancel := context.WithCancel(context.Background()) + cancel() + errCh := make(chan error, 1) + go func() { errCh <- sema.Acquire(ctx) }() + + gt.Eventually(errCh).Should(Receive(Equal(context.Canceled))) +} + +func TestSemaphoreReleaseTooMany(t *testing.T) { + sema := semaphore.New(1) + assert.PanicsWithValue(t, "semaphore buffer is empty", func() { sema.Release() }) +}