From 120b323bdb61f036b9fbf81f5f050717e9adcfd6 Mon Sep 17 00:00:00 2001 From: Aiden Date: Mon, 22 Aug 2022 08:03:03 +0100 Subject: [PATCH] Adds unbuffered worker (#13) * Adds unbuffered worker * remove interface method --- worker/buffered_pool.go | 3 ++- worker/unbuffered_pool.go | 43 ++++++++++++++++++++++++++++++++++ worker/unbuffered_pool_test.go | 32 +++++++++++++++++++++++++ 3 files changed, 77 insertions(+), 1 deletion(-) create mode 100644 worker/unbuffered_pool.go create mode 100644 worker/unbuffered_pool_test.go diff --git a/worker/buffered_pool.go b/worker/buffered_pool.go index 6472037..a568bd5 100644 --- a/worker/buffered_pool.go +++ b/worker/buffered_pool.go @@ -20,7 +20,7 @@ type BufferedPool interface { // compile time assertion var _ BufferedPool = (*BufferedPoolImpl)(nil) -// PoolImpl is an implementation that is compatible with Pool +// BufferedPoolImpl is an implementation that is compatible with Pool type BufferedPoolImpl struct { closeOnce sync.Once @@ -31,6 +31,7 @@ type BufferedPoolImpl struct { WaitGroup sync.WaitGroup } +// NewBufferedPool creates a new instance of BufferedPoolImpl func NewBufferedPool(workers, maxQueue int) *BufferedPoolImpl { w := &BufferedPoolImpl{ Tasks: make(chan func(), maxQueue), diff --git a/worker/unbuffered_pool.go b/worker/unbuffered_pool.go new file mode 100644 index 0000000..34f1f7a --- /dev/null +++ b/worker/unbuffered_pool.go @@ -0,0 +1,43 @@ +package worker + +import "sync" + +// UnbufferedPool is an interface which implements BufferedPoolImpl, allowing you to mock out the pool for tests. +type UnbufferedPool interface { + // Do enqueues a task and blocks the goroutine until it's enqueued. + Do(func()) + + // Wait blocks the goroutine until all tasks are complete. + Wait() +} + +// compile time assertion +var _ UnbufferedPool = (*UnbufferedPoolImpl)(nil) + +// UnbufferedPoolImpl is an implementation that is compatible with UnbufferedPool. +// +// The main purpose of this tool is to let you ensure all goroutines have been closed before exiting your app, you can +// pass an instance of this around everywhere instead. +type UnbufferedPoolImpl struct { + // WaitGroup tracks how many running/queued tasks there are, we expose Wait() so you can wait until all tasks are complete. + WaitGroup sync.WaitGroup +} + +// NewUnbufferedPool creates a new instance of BufferedPoolImpl +func NewUnbufferedPool() *UnbufferedPoolImpl { + return &UnbufferedPoolImpl{} +} + +// Do increments the wait group and invokes the goroutine, then decrements it. +func (w *UnbufferedPoolImpl) Do(cb func()) { + w.WaitGroup.Add(1) + go func(cb func()) { + cb() + w.WaitGroup.Done() + }(cb) +} + +// Wait blocks the goroutine until all tasks are complete. +func (w *UnbufferedPoolImpl) Wait() { + w.WaitGroup.Wait() +} diff --git a/worker/unbuffered_pool_test.go b/worker/unbuffered_pool_test.go new file mode 100644 index 0000000..4eb62c9 --- /dev/null +++ b/worker/unbuffered_pool_test.go @@ -0,0 +1,32 @@ +package worker_test + +import ( + "sync" + "testing" + + "github.com/aidenwallis/go-utils/worker" + "github.com/stretchr/testify/assert" +) + +func TestUnbufferedPool(t *testing.T) { + t.Parallel() + + p := worker.NewUnbufferedPool() + + var ( + m sync.Mutex + v int + ) + + for i := 0; i < 10; i++ { + p.Do(func() { + m.Lock() + v++ + m.Unlock() + }) + } + + p.Wait() + + assert.Equal(t, 10, v) +}